Compare commits

...

90 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
297246fa4c Merge pull request #92 from yuyangh/yuyangh/add_ASHA_GATT
add ASHA profile
2022-12-07 13:16:01 -08:00
Yuyang Huang
52db1cfcc1 improve code style 2022-12-06 07:38:05 -08:00
Yuyang Huang
29f9a79502 improve get service advertising data 2022-12-05 11:22:07 -08:00
Gilles Boccon-Gibod
c86125de4f Merge pull request #93 from AlanRosenthal/alan/add_default_services
Add Device::add_default_services()
2022-12-01 12:36:03 -08:00
Yuyang Huang
697d5df3f8 code style update 2022-12-01 10:50:15 -08:00
Yuyang Huang
87aa4f617e add ASHA advertising factory method 2022-12-01 10:40:30 -08:00
Alan Rosenthal
a8eff737e6 Add Device::add_default_services()
This will allow a test to:
a: add services to a device
b: reset services via `Server()`
c: add the default services back
2022-12-01 17:02:54 +00:00
Gilles Boccon-Gibod
4417eb636c Merge pull request #83 from AlanRosenthal/alan/pytest_fixes_2
Test all python versions in CI
2022-11-29 12:48:35 -08:00
Alan Rosenthal
f4e5e61bbb Test all python versions in CI
Followed instructions here: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#using-the-python-starter-workflow
2022-11-29 20:22:56 +00:00
Lucas Abel
ba7a60025f Merge pull request #89 from google/uael/misc
Typos & CI fixes
2022-11-29 12:14:03 -08:00
Yuyang Huang
d92b7e9b74 add ASHA Profile 2022-11-29 10:34:59 -08:00
Yuyang Huang
b0336adf1c add ASHA GATT UUID 2022-11-29 10:02:40 -08:00
Abel Lucas
691450c7de gatt: fix CharacteristicDeclaration.__str__ and associated test 2022-11-29 16:43:47 +00:00
Abel Lucas
99a0eb21c1 address: fix deprecated use of combined @classmethod and @property 2022-11-29 16:33:12 +00:00
Abel Lucas
ab4859bd94 device: fix typos 2022-11-29 16:33:12 +00:00
Lucas Abel
0d70cbde64 Merge pull request #75 from google/uael/fixes
Pairing: device/host fixes & improvements
2022-11-28 21:42:43 -08:00
Gilles Boccon-Gibod
f41d0682b2 Merge pull request #80 from AlanRosenthal/alan/gatt_server_getter
Added class CharacteristicDeclaration, gatt_server getters
2022-11-28 19:21:08 -08:00
Gilles Boccon-Gibod
062dc1e53d Merge pull request #85 from AlanRosenthal/alan/gatt_server_console2
Add `bumble-console --device-config` support for gatt services
2022-11-28 19:19:25 -08:00
Abel Lucas
662704e551 classic: complete authentication when being the .authenticate acceptor 2022-11-29 00:28:39 +00:00
Abel Lucas
02a474c44e smp: emit enough information on pairing complete to deduce security level 2022-11-29 00:28:38 +00:00
Abel Lucas
a1c7aec492 device: fix .find_connection_by_bd_addr 2022-11-29 00:28:38 +00:00
Abel Lucas
6112f00049 device: introduce BR/EDR pending connections
This commit enable the BR/EDR pairing to run asynchronously to
the connection being established.

When in security mode 3, a controller shall start authentication as
part of the connection, which result in HCI events being sent on a BD
address without a completed connection (ie. no connection handle).
2022-11-29 00:28:38 +00:00
Alan Rosenthal
f56ac14f2c Add bumble-console --device-config support for gatt services
This PR adds support for bumble-console to be preloaded with gatt services via `--device-config`.
This PR also adds some type annotations
2022-11-28 14:11:27 -05:00
Gilles Boccon-Gibod
a739fc71ce Merge pull request #84 from google/gbg/78
use libusb auto-detach feature
2022-11-27 12:42:02 -08:00
Alan Rosenthal
b89f9030a0 Added class CharacteristicDeclaration, gatt_server getters
* Converted CharacteristicDeclaration implementation to class
* Added ability to get a gatt_server attribute by service UUID, characteristics UUID, descriptor UUID
2022-11-27 19:22:25 +00:00
Gilles Boccon-Gibod
9e5a85bd10 use libusb auto-detach feature 2022-11-25 17:52:13 -08:00
Gilles Boccon-Gibod
b437bd8619 Merge pull request #82 from AlanRosenthal/alan/pytest_fixes
Fix test failures
2022-11-25 13:19:53 -08:00
Alan Rosenthal
a3e4674819 Fix test failures
a. `DeprecationWarning: The 'warn' method is deprecated, use 'warning' instead`
Updated call in `bumble/smp.py`

b. `ModuleNotFoundError: No module named 'bumble.apps'`
Updated imports in `tests/import_test.py`

c. Added `pytest-html` for easier viewing of test results
Added package in `setup.cfg`, and hook in `tasks.py`

d. Updated workflows to use `invoke test`

This is a partial fix of #81
2022-11-23 11:31:27 -05:00
Abel Lucas
5f1d57fcb0 device: simplify and fixes remote name request 2022-11-22 21:20:56 +00:00
Gilles Boccon-Gibod
ae0b739e4a Merge pull request #79 from google/gbg/fix-host-reset
fix sequencing logic broken by earlier merge
2022-11-22 09:16:26 -08:00
Michael Mogenson
0570b59796 Merge pull request #77 from mogenson/l2cap-on-event
Fix for 'Host' object has no attribute 'add_listener'
2022-11-22 09:39:04 -05:00
Gilles Boccon-Gibod
22218627f6 fix sequencing logic broken by earlier merge 2022-11-21 21:07:47 -08:00
Michael Mogenson
1c72242264 Fix for 'Host' object has no attribute 'add_listener'
Pyee's add_listener() method was not added until release 9.0.0. Bumble's
setup.cfg specifies a minimum pyee version of 8.2.2.

Remove the call to add_listener() in l2cap.py. If the add_listener() API
is prefered over the on(), another solution would be to bump the pyee
version requirement.
2022-11-21 12:31:21 -05:00
Abel Lucas
9c133706e6 keys: add a way to remove all bonds from key store 2022-11-18 18:22:15 +00:00
Michael Mogenson
4988a31487 Merge pull request #76 from mogenson/connection-error-params
Swap arguments to ConnectionError in RFCOMM Multiplexer
2022-11-18 10:48:02 -05:00
Michael Mogenson
e6c062117f Swap arguments to ConnectionError in RFCOMM Multiplexer
Minor fixup. Change the order of arguments to ConnectionError to set the
transport and address correctly in rfcomm.py on_dm_frame().
2022-11-18 10:02:40 -05:00
Gilles Boccon-Gibod
f2133235d5 Merge pull request #73 from google/gbg/faster-l2cap-test
lower the number of test cases for l2cap in order to speed up the test
2022-11-15 10:49:55 -08:00
Gilles Boccon-Gibod
867e8c13dc lower the number of test cases for l2cap in order to speed up the test 2022-11-14 17:26:09 -08:00
Lucas Abel
25ce38c3f5 Merge pull request #72 from google/uael/public-str-address
address: add public information to the stringified value
2022-11-14 17:16:47 -08:00
Abel Lucas
c0810230a6 address: add public information to the stringified value
This affect the way security keys are stored. For instance the same
key can be used both as public and random, and it need to be stored
separately one from each other.
2022-11-14 20:05:12 +00:00
Michael Mogenson
27c46eef9d Merge pull request #71 from mogenson/prefer-notify
Add prefer_notify option to gatt_client subscribe()
2022-11-13 19:53:09 -05:00
Michael Mogenson
c140876157 Add prefer_notify option to gatt_client subscribe()
If characteristic supports Notify and Indicate, the prefer_notify option
will subscribe with Notify if True or Indicate if False.

If characteristic only supports one property, Notify or Indicate, that
mode will be selected, regardless of the prefer_notify setting.

Tested with a characteristic that supports both Notify and Indicate and
verified that prefer_notify sets the desired mode.
2022-11-13 19:38:12 -05:00
Lucas Abel
d743656f09 Merge pull request #68 from google/uael/pairing-improvements
Pairing improvements
2022-11-11 21:03:17 -08:00
Abel Lucas
b91d0e24c1 device: handle HCI passkey notification event 2022-11-11 18:43:35 +00:00
Abel Lucas
eb46f60c87 le: save own_address_type on ACL connection for SMP to be able to use the right self address 2022-11-10 02:06:37 +00:00
Abel Lucas
8bbba7c84c pairing: always ask user for confirmation, even in JUST_WORKS method 2022-11-10 01:58:02 +00:00
Gilles Boccon-Gibod
ee54df2d08 Merge pull request #65 from google/gbg/fix-classic-connect-await
fix classic connection event filtering
2022-11-09 14:40:29 -08:00
Gilles Boccon-Gibod
6549e53398 Merge pull request #60 from google/gbg/fix-console-logs
use a formatter object, not a string
2022-11-09 13:19:26 -08:00
Gilles Boccon-Gibod
0f219eff12 address PR comments 2022-11-09 13:18:30 -08:00
Gilles Boccon-Gibod
4a1345cf95 only force the type if the address is passed as a string 2022-11-08 19:10:13 -08:00
Gilles Boccon-Gibod
8a1cdef152 fix classic connection event filtering 2022-11-08 17:33:29 -08:00
Gilles Boccon-Gibod
6e1baf0344 use a formatter object, not a string 2022-11-08 13:19:41 -08:00
Lucas Abel
cea1905ffb Merge pull request #59 from google/uael/device-cleanup
le: pass `own_address_type` to BLE `Device.connect`
2022-11-08 11:50:40 -08:00
Abel Lucas
af8e0d4dc7 le: pass own_address_type to BLE Device.connect 2022-11-08 18:22:54 +00:00
Gilles Boccon-Gibod
875195aebb Merge pull request #58 from AlanRosenthal/main
Add definition of `Client Characteristic Configuration bit`
2022-11-08 09:34:22 -08:00
Gilles Boccon-Gibod
5aee37aeab Merge pull request #34 from google/gbg/l2cap-bridge
Add L2CAP CoC support
2022-11-07 16:57:17 -08:00
Gilles Boccon-Gibod
edcb7d05d6 fix merge conflict 2022-11-07 16:51:40 -08:00
Gilles Boccon-Gibod
ce9004f0ac Add L2CAP CoC support (squashed)
[85542e0] fix test
[3748781] add ASAH sink example
[e782e29] add app
[83daa30] wip
[7f138a0] add test
[f732108] allow different address syntax
[9d0bbf8] rename deprecated methods
[eb303d5] add LE CoC support
2022-11-07 16:45:37 -08:00
Alan Rosenthal
d4228e3b5b Add definition of Client Characteristic Configuration bit 2022-11-07 19:43:22 -05:00
Lucas Abel
be8f8ac68f Merge pull request #55 from google/uael/device-improvements
Device improvements
2022-11-07 15:22:41 -08:00
Abel Lucas
ca16410a6d device: add option to check for the address type when using find_connection_by_bd_addr 2022-11-07 22:17:01 +00:00
Abel Lucas
b95888eb39 le: permit legacy scanning even when extended is supported 2022-11-07 22:15:54 +00:00
Abel Lucas
56ed46adfa classic: add BR/EDR accept connection logic 2022-11-04 17:26:59 +00:00
Abel Lucas
7044102e05 classic: upgrade Device.cancel_connection logic to support canceling ongoing BR/EDR connections 2022-11-04 17:26:59 +00:00
Abel Lucas
ca8f284888 le: add own_address_type parameter to Device.start_advertising 2022-11-04 17:26:59 +00:00
Abel Lucas
e9e14f5183 le: make the device connecting state relative to LE only
We may need to add a distinct BR/EDR connecting state in the future.
2022-11-04 17:26:59 +00:00
Abel Lucas
b961affd3d device: update Device.connect documentation to match BR/EDR behavior 2022-11-04 17:26:59 +00:00
Abel Lucas
51ddb36c91 device: add auto_restart mechanism to .start_discovery (default to True) 2022-11-04 17:26:59 +00:00
Abel Lucas
78534b659a device: enhance .request_remote_name to also accept an Address as argument 2022-11-04 17:26:59 +00:00
Abel Lucas
ce9472bf42 core: change AdvertisingData.get default raw behavior to False 2022-11-04 17:26:59 +00:00
Abel Lucas
fc331b7aea core: improve Advertisement.ad_data_to_object with support for more data types 2022-11-04 17:26:59 +00:00
Abel Lucas
8119bc210c host: pass remote_host_supported_features event to upper layer
The `HCI_Remote_Name_Request` command may trigger this HCI event.
Instead of warn for not being handled, pass it to upper layer.
2022-11-02 20:23:14 +00:00
Abel Lucas
65deefdc64 host: allow bytes return paramaters when checking command result 2022-11-02 20:23:14 +00:00
Michael Mogenson
2920c3b0d1 Merge pull request #53 from mogenson/mogenson/show-device-tab
Add a show device tab
2022-10-24 09:15:43 -04:00
Michael Mogenson
f5cd825dbc Merge pull request #51 from mogenson/mogenson/console-py-rand-addr
Use random address in console.py if device config is not provided
2022-10-24 09:15:10 -04:00
Gilles Boccon-Gibod
cf4c43c4ff Merge pull request #48 from google/uael/classic-parallel-connect
classic: update `Device.connect` to allow parallels connection creation
2022-10-23 20:52:08 -07:00
Gilles Boccon-Gibod
da2f596e52 Merge pull request #50 from google/uael/command-timeout
device: raise a `CommandTimeoutError` error on command timeout
2022-10-23 20:49:06 -07:00
Gilles Boccon-Gibod
c8aa0b4ef6 Merge pull request #54 from google/gbg/fix-regression-001
use the correct constants as previously renamed
2022-10-23 20:43:43 -07:00
Gilles Boccon-Gibod
75ac276c8b use the correct constants as previously renamed 2022-10-21 17:12:26 -07:00
Michael Mogenson
dd4023ff56 Add a show device tab
Show configuration data about the Bumble device. Make this the default
tab on startup.
2022-10-21 16:00:03 -04:00
Michael Mogenson
dde8c5e1c2 Use random address in console.py if device config is not provided
If a device configuration is not provided on startup, generate a random
BT address instead of using a default static value of
"F0:F1:F2:F3:F4:F5". This is helpful to avoid colisions when there are
two instances of console.py running nearby.

Testing:
Started console.py and began advertising a few times. Scanned from a
second instance of console.py and observed that the advertising address
changed with each restart.
2022-10-21 15:32:58 -04:00
Michael Mogenson
8ed1f4b50d Merge pull request #52 from mogenson/mogenson/console-py-clear-scan-results
add 'scan clear' command to console.py
2022-10-21 14:34:08 -04:00
Michael Mogenson
92de7dff4f add 'scan clear' command to console.py
Add command to clear scan results and known addresses. Useful for
determining if a peripheral has stopped advertising.

Also, check if a scan is in progress before connecting. If it is, stop
scanning. Some BT controllers will fail to connect while scanning.

Testing:
Can clear scan results before, during, and after scan. Can clear scan
results while disconnected and connected.
2022-10-21 13:58:21 -04:00
Abel Lucas
16b4f18c92 tests: add parallel device connection test 2022-10-21 15:49:03 +00:00
Gilles Boccon-Gibod
46f4b82d29 Merge pull request #46 from AlanRosenthal/main
Add runtime switch for filtering by address.
2022-10-20 19:20:28 -07:00
Abel Lucas
4e2f66f709 device: raise a CommandTimeoutError error on command timeout 2022-10-20 22:11:07 +00:00
Alan Rosenthal
3d79d7def5 Add runtime switch for filtering by address.
* scan on [filter pattern]
* filter address <filter pattern>
2022-10-20 14:47:14 -04:00
Abel Lucas
915405a9bd examples: update run_classic_connect example to take multiple addresses instead of one 2022-10-20 14:53:39 +00:00
Abel Lucas
45dd849d9f classic: update ConnectionError to take transport and peer address 2022-10-20 14:53:03 +00:00
Abel Lucas
7208fd6642 classic: update Device.connect to allow parallels connection creation
According to the specification nothing prevent the Host from creating
multiple connections at the same time. This commit add this mechanisme
by matching the `connection` and `connection_failure` events against the
peer address.
2022-10-19 17:44:44 +00:00
36 changed files with 3283 additions and 425 deletions

View File

@@ -14,6 +14,10 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
fail-fast: false
steps:
- name: Check out from Git
@@ -21,18 +25,18 @@ jobs:
- name: Get history and tags for SCM versioning to work
run: |
git fetch --prune --unshallow
git fetch --depth=1 origin +refs/tags/*:refs/tags/*
- name: Set up Python 3.10
uses: actions/setup-python@v3
git fetch --depth=1 origin +refs/tags/*:refs/tags/*
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development,documentation]"
- name: Test with pytest
- name: Test
run: |
pytest
invoke test
- name: Build
run: |
inv build

5
.gitignore vendored
View File

@@ -3,9 +3,6 @@ build/
dist/
*.egg-info/
*~
bumble/__pycache__
docs/mkdocs/site
tests/__pycache__
test-results.xml
bumble/transport/__pycache__
bumble/profiles/__pycache__
__pycache__

View File

@@ -20,12 +20,13 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
from bumble.hci import HCI_Constant
import os
import os.path
import logging
import click
import os
import random
import re
from collections import OrderedDict
import click
import colors
from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
@@ -34,6 +35,7 @@ from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
@@ -126,7 +128,7 @@ class ConsoleApp:
self.known_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'scan'
self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
@@ -150,7 +152,8 @@ class ConsoleApp:
return NestedCompleter.from_nested_dict({
'scan': {
'on': None,
'off': None
'off': None,
'clear': None
},
'advertise': {
'on': None,
@@ -164,7 +167,11 @@ class ConsoleApp:
'scan': None,
'services': None,
'attributes': None,
'log': None
'log': None,
'device': None
},
'filter': {
'address': None,
},
'connect': LiveCompleter(self.known_addresses),
'update-parameters': None,
@@ -207,6 +214,7 @@ class ConsoleApp:
self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl()
self.device_text = FormattedTextControl()
self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
@@ -229,6 +237,10 @@ class ConsoleApp:
Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log')
),
ConditionalContainer(
Frame(Window(self.device_text), title='Device'),
filter=Condition(lambda: self.top_tab == 'device')
),
Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field
@@ -267,9 +279,14 @@ class ConsoleApp:
if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
else:
self.device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
random_address = f"{random.randint(192,255):02X}" # address is static random
for c in random.sample(range(255), 5):
random_address += f":{c:02X}"
self.append_to_log(f"Setting random address: {random_address}")
self.device = Device.with_hci('Bumble', random_address, hci_source, hci_sink)
self.device.listener = DeviceListener(self)
await self.device.power_on()
self.show_device(self.device)
# Run the UI
await self.ui.run_async()
@@ -294,7 +311,7 @@ class ConsoleApp:
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device:
if self.device.is_connecting:
if self.device.is_le_connecting:
connection_state = 'CONNECTING'
elif self.connected_peer:
connection = self.connected_peer.connection
@@ -351,7 +368,7 @@ class ConsoleApp:
self.services_text.text = lines
self.ui.invalidate()
async def show_attributes(self, attributes):
def show_attributes(self, attributes):
lines = []
for attribute in attributes:
@@ -360,6 +377,44 @@ class ConsoleApp:
self.attributes_text.text = lines
self.ui.invalidate()
def show_device(self, device):
lines = []
lines.append(('ansicyan', 'Name: '))
lines.append(('', f'{device.name}\n'))
lines.append(('ansicyan', 'Public Address: '))
lines.append(('', f'{device.public_address}\n'))
lines.append(('ansicyan', 'Random Address: '))
lines.append(('', f'{device.random_address}\n'))
lines.append(('ansicyan', 'LE Enabled: '))
lines.append(('', f'{device.le_enabled}\n'))
lines.append(('ansicyan', 'Classic Enabled: '))
lines.append(('', f'{device.classic_enabled}\n'))
lines.append(('ansicyan', 'Classic SC Enabled: '))
lines.append(('', f'{device.classic_sc_enabled}\n'))
lines.append(('ansicyan', 'Classic SSP Enabled: '))
lines.append(('', f'{device.classic_ssp_enabled}\n'))
lines.append(('ansicyan', 'Classic Class: '))
lines.append(('', f'{device.class_of_device}\n'))
lines.append(('ansicyan', 'Discoverable: '))
lines.append(('', f'{device.discoverable}\n'))
lines.append(('ansicyan', 'Connectable: '))
lines.append(('', f'{device.connectable}\n'))
lines.append(('ansicyan', 'Advertising Data: '))
lines.append(('', f'{device.advertising_data}\n'))
lines.append(('ansicyan', 'Scan Response Data: '))
lines.append(('', f'{device.scan_response_data}\n'))
advertising_interval = (
device.advertising_interval_min
if device.advertising_interval_min == device.advertising_interval_max
else f"{device.advertising_interval_min} to {device.advertising_interval_max}"
)
lines.append(('ansicyan', 'Advertising Interval: '))
lines.append(('', f'{advertising_interval}\n'))
self.device_text.text = lines
self.ui.invalidate()
def append_to_output(self, line, invalidate=True):
if type(line) is str:
line = [('', line)]
@@ -388,7 +443,10 @@ class ConsoleApp:
# Discover all services, characteristics and descriptors
self.append_to_output('discovering services...')
await self.connected_peer.discover_services()
self.append_to_output(f'found {len(self.connected_peer.services)} services, discovering charateristics...')
self.append_to_output(
f'found {len(self.connected_peer.services)} services,'
' discovering characteristics...'
)
await self.connected_peer.discover_characteristics()
self.append_to_output('found characteristics, discovering descriptors...')
for service in self.connected_peer.services:
@@ -408,7 +466,7 @@ class ConsoleApp:
attributes = await self.connected_peer.discover_attributes()
self.append_to_output(f'discovered {len(attributes)} attributes...')
await self.show_attributes(attributes)
self.show_attributes(attributes)
def find_characteristic(self, param):
parts = param.split('.')
@@ -455,10 +513,23 @@ class ConsoleApp:
else:
await self.device.start_scanning()
elif params[0] == 'on':
if len(params) == 2:
if not params[1].startswith("filter="):
self.show_error('invalid syntax', 'expected address filter=key1:value1,key2:value,... available filters: address')
# regex: (word):(any char except ,)
matches = re.findall(r"(\w+):([^,]+)", params[1])
for match in matches:
if match[0] == "address":
self.device.listener.address_filter = match[1]
await self.device.start_scanning()
self.top_tab = 'scan'
elif params[0] == 'off':
await self.device.stop_scanning()
elif params[0] == 'clear':
self.device.listener.scan_results.clear()
self.known_addresses.clear()
self.show_scan_results(self.device.listener.scan_results)
else:
self.show_error('unsupported arguments for scan command')
@@ -490,6 +561,9 @@ class ConsoleApp:
for phy in phys
}
if self.device.is_scanning:
await self.device.stop_scanning()
self.append_to_output('connecting...')
try:
@@ -503,7 +577,7 @@ class ConsoleApp:
self.show_error('connection timed out')
async def do_disconnect(self, params):
if self.device.connecting:
if self.device.is_le_connecting:
await self.device.cancel_connection()
else:
if not self.connected_peer:
@@ -555,7 +629,7 @@ class ConsoleApp:
async def do_show(self, params):
if params:
if params[0] in {'scan', 'services', 'attributes', 'log'}:
if params[0] in {'scan', 'services', 'attributes', 'log', 'device'}:
self.top_tab = params[0]
self.ui.invalidate()
@@ -708,6 +782,12 @@ class ConsoleApp:
async def do_quit(self, params):
self.ui.exit()
async def do_filter(self, params):
if params[0] == "address":
if len(params) != 2:
self.show_error('invalid syntax', 'expected filter address <pattern>')
return
self.device.listener.address_filter = params[1]
# -----------------------------------------------------------------------------
# Device and Connection Listener
@@ -716,6 +796,26 @@ class DeviceListener(Device.Listener, Connection.Listener):
def __init__(self, app):
self.app = app
self.scan_results = OrderedDict()
self.address_filter = None
@property
def address_filter(self):
return self._address_filter
@address_filter.setter
def address_filter(self, filter_addr):
if filter_addr is None:
self._address_filter = re.compile(r".*")
else:
self._address_filter = re.compile(filter_addr)
self.scan_results = OrderedDict(filter(lambda x: self.filter_address_match(x), self.scan_results))
self.app.show_scan_results(self.scan_results)
def filter_address_match(self, address):
"""
Returns true if an address matches the filter
"""
return bool(self.address_filter.match(address))
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
@@ -745,6 +845,9 @@ class DeviceListener(Device.Listener, Connection.Listener):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
def on_advertisement(self, advertisement):
if not self.filter_address_match(str(advertisement.address)):
return
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key)
if entry:
@@ -777,9 +880,9 @@ class ScanResult:
else:
type_color = colors.cyan
name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME)
name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
if name is None:
name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME)
name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
if name:
# Convert to string
try:
@@ -802,6 +905,7 @@ class LogHandler(logging.Handler):
def __init__(self, app):
super().__init__()
self.app = app
self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
def emit(self, record):
message = self.format(record)
@@ -826,6 +930,7 @@ def main(device_config, transport):
# logging.basicConfig(level = 'FATAL')
# logging.basicConfig(level = 'DEBUG')
root_logger = logging.getLogger()
root_logger.addHandler(LogHandler(app))
root_logger.setLevel(logging.DEBUG)

View File

@@ -17,13 +17,14 @@
# -----------------------------------------------------------------------------
import asyncio
import os
import struct
import logging
import click
from colors import color
from bumble.device import Device, Peer
from bumble.core import AdvertisingData
from bumble.gatt import Service, Characteristic
from bumble.gatt import Service, Characteristic, CharacteristicValue
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.hci import HCI_Constant
@@ -41,13 +42,59 @@ GG_PREFERRED_MTU = 256
# -----------------------------------------------------------------------------
class GattlinkHubBridge(Device.Listener):
class GattlinkL2capEndpoint:
def __init__(self):
self.peer = None
self.rx_socket = None
self.tx_socket = None
self.rx_characteristic = None
self.tx_characteristic = None
self.l2cap_channel = None
self.l2cap_packet = b''
self.l2cap_packet_size = 0
# Called when an L2CAP SDU has been received
def on_coc_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
while len(sdu):
if self.l2cap_packet_size == 0:
# Expect a new packet
self.l2cap_packet_size = sdu[0] + 1
sdu = sdu[1:]
else:
bytes_needed = self.l2cap_packet_size - len(self.l2cap_packet)
chunk = min(bytes_needed, len(sdu))
self.l2cap_packet += sdu[:chunk]
sdu = sdu[chunk:]
if len(self.l2cap_packet) == self.l2cap_packet_size:
self.on_l2cap_packet(self.l2cap_packet)
self.l2cap_packet = b''
self.l2cap_packet_size = 0
# -----------------------------------------------------------------------------
class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
def __init__(self, device, peer_address):
super().__init__()
self.device = device
self.peer_address = peer_address
self.peer = None
self.tx_socket = None
self.rx_characteristic = None
self.tx_characteristic = None
self.l2cap_psm_characteristic = None
device.listener = self
async def start(self):
# Connect to the peer
print(f'=== Connecting to {self.peer_address}...')
await self.device.connect(self.peer_address)
async def connect_l2cap(self, psm):
print(color(f'### Connecting with L2CAP on PSM = {psm}', 'yellow'))
try:
self.l2cap_channel = await self.peer.connection.open_l2cap_channel(psm)
print(color('*** Connected', 'yellow'), self.l2cap_channel)
self.l2cap_channel.sink = self.on_coc_sdu
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
@@ -80,15 +127,24 @@ class GattlinkHubBridge(Device.Listener):
self.rx_characteristic = characteristic
elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID:
self.tx_characteristic = characteristic
elif characteristic.uuid == GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID:
self.l2cap_psm_characteristic = characteristic
print('RX:', self.rx_characteristic)
print('TX:', self.tx_characteristic)
print('PSM:', self.l2cap_psm_characteristic)
# Subscribe to TX
if self.tx_characteristic:
if self.l2cap_psm_characteristic:
# Subscribe to and then read the PSM value
await self.peer.subscribe(self.l2cap_psm_characteristic, self.on_l2cap_psm_received)
psm_bytes = await self.peer.read_value(self.l2cap_psm_characteristic)
psm = struct.unpack('<H', psm_bytes)[0]
await self.connect_l2cap(psm)
elif self.tx_characteristic:
# Subscribe to TX
await self.peer.subscribe(self.tx_characteristic, self.on_tx_received)
print(color('=== Subscribed to Gattlink TX', 'yellow'))
else:
print(color('!!! Gattlink TX not found', 'red'))
print(color('!!! No Gattlink TX or PSM found', 'red'))
def on_connection_failure(self, error):
print(color(f'!!! Connection failed: {error}'))
@@ -99,31 +155,23 @@ class GattlinkHubBridge(Device.Listener):
self.rx_characteristic = None
self.peer = None
# Called when an L2CAP packet has been received
def on_l2cap_packet(self, packet):
print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(packet)
# Called by the GATT client when a notification is received
def on_tx_received(self, value):
print(color('>>> TX:', 'magenta'), value.hex())
print(color(f'<<< [GATT TX]: {len(value)} bytes', 'cyan'))
if self.tx_socket:
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(value)
# Called by asyncio when the UDP socket is created
def connection_made(self, transport):
pass
# Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address):
print(color('<<< RX:', 'magenta'), data.hex())
# TODO: use a queue instead of creating a task everytime
if self.peer and self.rx_characteristic:
asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
# -----------------------------------------------------------------------------
class GattlinkNodeBridge(Device.Listener):
def __init__(self):
self.peer = None
self.rx_socket = None
self.tx_socket = None
def on_l2cap_psm_received(self, value):
psm = struct.unpack('<H', value)[0]
asyncio.create_task(self.connect_l2cap(psm))
# Called by asyncio when the UDP socket is created
def connection_made(self, transport):
@@ -131,21 +179,130 @@ class GattlinkNodeBridge(Device.Listener):
# Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address):
print(color('<<< RX:', 'magenta'), data.hex())
print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
# TODO: use a queue instead of creating a task everytime
if self.peer and self.rx_characteristic:
if self.l2cap_channel:
print(color('>>> [L2CAP]', 'yellow'))
self.l2cap_channel.write(bytes([len(data) - 1]) + data)
elif self.peer and self.rx_characteristic:
print(color('>>> [GATT RX]', 'yellow'))
asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
# -----------------------------------------------------------------------------
async def run(hci_transport, device_address, send_host, send_port, receive_host, receive_port):
class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
def __init__(self, device):
super().__init__()
self.device = device
self.peer = None
self.tx_socket = None
self.tx_subscriber = None
self.rx_characteristic = None
# Register as a listener
device.listener = self
# Listen for incoming L2CAP CoC connections
psm = 0xFB
device.register_l2cap_channel_server(0xFB, self.on_coc)
print(f'### Listening for CoC connection on PSM {psm}')
# Setup the Gattlink service
self.rx_characteristic = Characteristic(
GG_GATTLINK_RX_CHARACTERISTIC_UUID,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_rx_write)
)
self.tx_characteristic = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.NOTIFY,
Characteristic.READABLE
)
self.tx_characteristic.on('subscription', self.on_tx_subscription)
self.psm_characteristic = Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([psm, 0])
)
gattlink_service = Service(
GG_GATTLINK_SERVICE_UUID,
[
self.rx_characteristic,
self.tx_characteristic,
self.psm_characteristic
]
)
device.add_services([gattlink_service])
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
])
)
async def start(self):
await self.device.start_advertising()
# Called by asyncio when the UDP socket is created
def connection_made(self, transport):
self.transport = transport
# Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address):
print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
if self.l2cap_channel:
print(color('>>> [L2CAP]', 'yellow'))
self.l2cap_channel.write(bytes([len(data) - 1]) + data)
elif self.tx_subscriber:
print(color('>>> [GATT TX]', 'yellow'))
self.tx_characteristic.value = data
asyncio.create_task(self.device.notify_subscribers(self.tx_characteristic))
# Called when a write to the RX characteristic has been received
def on_rx_write(self, connection, data):
print(color(f'<<< [GATT RX]: {len(data)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(data)
# Called when the subscription to the TX characteristic has changed
def on_tx_subscription(self, peer, enabled):
print(f'### [GATT TX] subscription from {peer}: {"enabled" if enabled else "disabled"}')
if enabled:
self.tx_subscriber = peer
else:
self.tx_subscriber = None
# Called when an L2CAP packet is received
def on_l2cap_packet(self, packet):
print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(packet)
# Called when a new connection is established
def on_coc(self, channel):
print('*** CoC Connection', channel)
self.l2cap_channel = channel
channel.sink = self.on_coc_sdu
# -----------------------------------------------------------------------------
async def run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port):
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
# Instantiate a bridge object
bridge = GattlinkNodeBridge()
device = Device.with_hci('Bumble GG', device_address, hci_source, hci_sink)
# Instantiate a bridge object
if role_or_peer_address == 'node':
bridge = GattlinkNodeBridge(device)
else:
bridge = GattlinkHubBridge(device, role_or_peer_address)
# Create a UDP to RX bridge (receive from UDP, send to RX)
loop = asyncio.get_running_loop()
@@ -160,35 +317,8 @@ async def run(hci_transport, device_address, send_host, send_port, receive_host,
remote_addr=(send_host, send_port)
)
# Create a device to manage the host, with a custom listener
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device.listener = bridge
await device.power_on()
# Connect to the peer
# print(f'=== Connecting to {device_address}...')
# await device.connect(device_address)
# TODO move to class
gattlink_service = Service(
GG_GATTLINK_SERVICE_UUID,
[
Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ,
Characteristic.READABLE,
bytes([193, 0])
)
]
)
device.add_services([gattlink_service])
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
])
)
await device.start_advertising()
await bridge.start()
# Wait until the source terminates
await hci_source.wait_for_termination()
@@ -197,15 +327,16 @@ async def run(hci_transport, device_address, send_host, send_port, receive_host,
@click.command()
@click.argument('hci_transport')
@click.argument('device_address')
@click.argument('role_or_peer_address')
@click.option('-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to')
@click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to')
@click.option('-rh', '--receive-host', type=str, default='127.0.0.1', help='UDP host to receive on')
@click.option('-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on')
def main(hci_transport, device_address, send_host, send_port, receive_host, receive_port):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run(hci_transport, device_address, send_host, send_port, receive_host, receive_port))
def main(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port):
asyncio.run(run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port))
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__':
main()

331
apps/l2cap_bridge.py Normal file
View File

@@ -0,0 +1,331 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import click
import logging
import os
from colors import color
from bumble.transport import open_transport_or_link
from bumble.device import Device
from bumble.utils import FlowControlAsyncPipe
from bumble.hci import HCI_Constant
# -----------------------------------------------------------------------------
class ServerBridge:
"""
L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel
on a specified PSM. When the connection is made, the bridge connects a TCP
socket to a remote host and bridges the data in both directions, with flow
control.
When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket
and waits for a new L2CAP CoC channel to be connected.
When the TCP connection is closed by the TCP server, XXXX
"""
def __init__(
self,
psm,
max_credits,
mtu,
mps,
tcp_host,
tcp_port
):
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.tcp_host = tcp_host
self.tcp_port = tcp_port
async def start(self, device):
# Listen for incoming L2CAP CoC connections
device.register_l2cap_channel_server(
psm = self.psm,
server = self.on_coc,
max_credits = self.max_credits,
mtu = self.mtu,
mps = self.mps
)
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection):
def on_ble_disconnection(reason):
print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
print(color('@@@ Bluetooth connection:', 'green'), connection)
connection.on('disconnection', on_ble_disconnection)
device.on('connection', on_ble_connection)
await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe:
def __init__(self, bridge, l2cap_channel):
self.bridge = bridge
self.tcp_transport = None
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu
async def connect_to_tcp(self):
# Connect to the TCP server
print(color(f'### Connecting to TCP {self.bridge.tcp_host}:{self.bridge.tcp_port}...', 'yellow'))
class TcpClientProtocol(asyncio.Protocol):
def __init__(self, pipe):
self.pipe = pipe
def connection_lost(self, error):
print(color(f'!!! TCP connection lost: {error}', 'red'))
if self.pipe.l2cap_channel is not None:
asyncio.create_task(self.pipe.l2cap_channel.disconnect())
def data_received(self, data):
print(f'<<< Received on TCP: {len(data)}')
self.pipe.l2cap_channel.write(data)
try:
self.tcp_transport, _ = await asyncio.get_running_loop().create_connection(
lambda: TcpClientProtocol(self),
host=self.bridge.tcp_host,
port=self.bridge.tcp_port,
)
print(color('### Connected', 'green'))
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
await self.l2cap_channel.disconnect()
def on_l2cap_close(self):
self.l2cap_channel = None
if self.tcp_transport is not None:
self.tcp_transport.close()
def on_coc_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red'))
return
self.tcp_transport.write(sdu)
pipe = Pipe(self, l2cap_channel)
asyncio.create_task(pipe.connect_to_tcp())
# -----------------------------------------------------------------------------
class ClientBridge:
"""
L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound
TCP connection on a specified port number. When a TCP client connects, an
L2CAP CoC channel connection to the BLE device is established, and the data
is bridged in both directions, with flow control.
When the TCP connection is closed by the client, the L2CAP CoC channel is
disconnected, but the connection to the BLE device remains, ready for a new
TCP client to connect.
When the L2CAP CoC channel is closed, XXXX
"""
READ_CHUNK_SIZE = 4096
def __init__(
self,
psm,
max_credits,
mtu,
mps,
address,
tcp_host,
tcp_port
):
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.address = address
self.tcp_host = tcp_host
self.tcp_port = tcp_port
async def start(self, device):
print(color(f'### Connecting to {self.address}...', 'yellow'))
connection = await device.connect(self.address)
print(color('### Connected', 'green'))
# Called when the BLE connection is disconnected
def on_ble_disconnection(reason):
print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
connection.on('disconnection', on_ble_disconnection)
# Called when a TCP connection is established
async def on_tcp_connection(reader, writer):
peername = writer.get_extra_info('peername')
print(color(f'<<< TCP connection from {peername}', 'magenta'))
def on_coc_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu)
def on_l2cap_close():
print(color('*** L2CAP channel closed', 'red'))
l2cap_to_tcp_pipe.stop()
writer.close()
# Connect a new L2CAP channel
print(color(f'>>> Opening L2CAP channel on PSM = {self.psm}', 'yellow'))
try:
l2cap_channel = await connection.open_l2cap_channel(
psm = self.psm,
max_credits = self.max_credits,
mtu = self.mtu,
mps = self.mps
)
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
writer.close()
return
l2cap_channel.sink = on_coc_sdu
l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP
l2cap_to_tcp_pipe = FlowControlAsyncPipe(
l2cap_channel.pause_reading,
l2cap_channel.resume_reading,
writer.write,
writer.drain
)
l2cap_to_tcp_pipe.start()
# Pipe data from TCP to L2CAP
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color('!!! End of stream', 'red'))
await l2cap_channel.disconnect()
return
print(color(f'<<< [TCP DATA]: {len(data)} bytes', 'blue'))
l2cap_channel.write(data)
await l2cap_channel.drain()
except Exception as error:
print(f'!!! Exception: {error}')
break
writer.close()
print(color('~~~ Bye bye', 'magenta'))
await asyncio.start_server(
on_tcp_connection,
host=self.tcp_host if self.tcp_host != '_' else None,
port=self.tcp_port
)
print(color(f'### Listening for TCP connections on port {self.tcp_port}', 'magenta'))
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
# Let's go
await device.power_on()
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
@click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234)
@click.option('--l2cap-coc-max-credits', help='Maximum L2CAP CoC Credits', type=click.IntRange(1, 65535), default=128)
@click.option('--l2cap-coc-mtu', help='L2CAP CoC MTU', type=click.IntRange(23, 65535), default=1022)
@click.option('--l2cap-coc-mps', help='L2CAP CoC MPS', type=click.IntRange(23, 65533), default=1024)
def cli(context, device_config, hci_transport, psm, l2cap_coc_max_credits, l2cap_coc_mtu, l2cap_coc_mps):
context.ensure_object(dict)
context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits
context.obj['mtu'] = l2cap_coc_mtu
context.obj['mps'] = l2cap_coc_mps
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.option('--tcp-host', help='TCP host', default='localhost')
@click.option('--tcp-port', help='TCP port', default=9544)
def server(context, tcp_host, tcp_port):
bridge = ServerBridge(
context.obj['psm'],
context.obj['max_credits'],
context.obj['mtu'],
context.obj['mps'],
tcp_host,
tcp_port)
asyncio.run(run(
context.obj['device_config'],
context.obj['hci_transport'],
bridge
))
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.argument('bluetooth-address')
@click.option('--tcp-host', help='TCP host', default='_')
@click.option('--tcp-port', help='TCP port', default=9543)
def client(context, bluetooth_address, tcp_host, tcp_port):
bridge = ClientBridge(
context.obj['psm'],
context.obj['max_credits'],
context.obj['mtu'],
context.obj['mps'],
bluetooth_address,
tcp_host,
tcp_port
)
asyncio.run(run(
context.obj['device_config'],
context.obj['hci_transport'],
bridge
))
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__':
cli(obj={})

View File

@@ -351,7 +351,7 @@ class MediaPacketPump:
logger.debug('pump canceled')
# Pump packets
self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
self.pump_task = asyncio.create_task(pump_packets())
async def stop(self):
# Stop the pump
@@ -1890,10 +1890,10 @@ class LocalSource(LocalStreamEndPoint, EventEmitter):
self.configuration = configuration
def on_start_command(self):
asyncio.get_running_loop().create_task(self.start())
asyncio.create_task(self.start())
def on_suspend_command(self):
asyncio.get_running_loop().create_task(self.stop())
asyncio.create_task(self.stop())
# -----------------------------------------------------------------------------

View File

@@ -58,6 +58,12 @@ def padded_bytes(buffer, size):
return buffer + bytes(padding_size)
def get_dict_key_by_value(dictionary, value):
for key, val in dictionary.items():
if val == value:
return key
return None
# -----------------------------------------------------------------------------
# Exceptions
# -----------------------------------------------------------------------------
@@ -91,6 +97,10 @@ class TimeoutError(Exception):
""" Timeout Error """
class CommandTimeoutError(Exception):
""" Command Timeout Error """
class InvalidStateError(Exception):
""" Invalid State Error """
@@ -100,6 +110,11 @@ class ConnectionError(BaseError):
FAILURE = 0x01
CONNECTION_REFUSED = 0x02
def __init__(self, error_code, transport, peer_address, error_namespace='', error_name='', details=''):
super().__init__(error_code, error_namespace, error_name, details)
self.transport = transport
self.peer_address = peer_address
# -----------------------------------------------------------------------------
# UUID
@@ -126,7 +141,7 @@ class UUID:
else:
uuid_str = uuid_str_or_int
if len(uuid_str) != 32 and len(uuid_str) != 8 and len(uuid_str) != 4:
raise ValueError('invalid UUID format')
raise ValueError(f"invalid UUID format: {uuid_str}")
self.uuid_bytes = bytes(reversed(bytes.fromhex(uuid_str)))
self.name = name
@@ -760,17 +775,20 @@ class AdvertisingData:
def ad_data_to_object(ad_type, ad_data):
if ad_type in {
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
}:
return AdvertisingData.uuid_list_to_objects(ad_data, 2)
elif ad_type in {
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
}:
return AdvertisingData.uuid_list_to_objects(ad_data, 4)
elif ad_type in {
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
}:
return AdvertisingData.uuid_list_to_objects(ad_data, 16)
elif ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID:
@@ -781,11 +799,24 @@ class AdvertisingData:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
elif ad_type in {
AdvertisingData.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME
AdvertisingData.COMPLETE_LOCAL_NAME,
AdvertisingData.URI
}:
return ad_data.decode("utf-8")
elif ad_type == AdvertisingData.TX_POWER_LEVEL:
elif ad_type in {
AdvertisingData.TX_POWER_LEVEL,
AdvertisingData.FLAGS
}:
return ad_data[0]
elif ad_type in {
AdvertisingData.APPEARANCE,
AdvertisingData.ADVERTISING_INTERVAL
}:
return struct.unpack('<H', ad_data)[0]
elif ad_type == AdvertisingData.CLASS_OF_DEVICE:
return struct.unpack('<I', bytes([*ad_data, 0]))[0]
elif ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return struct.unpack('<HH', ad_data)
elif ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (struct.unpack_from('<H', ad_data, 0)[0], ad_data[2:])
else:
@@ -802,7 +833,7 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data))
offset += length
def get(self, type_id, return_all=False, raw=True):
def get(self, type_id, return_all=False, raw=False):
'''
Get Advertising Data Structure(s) with a given type

File diff suppressed because it is too large Load Diff

View File

@@ -22,9 +22,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import enum
import types
import logging
from pyee import EventEmitter
from colors import color
from .core import *
@@ -149,6 +152,14 @@ GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# ASHA Service
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
GATT_ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
# Misc
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
@@ -185,7 +196,7 @@ class Service(Attribute):
See Vol 3, Part G - 3.1 SERVICE DEFINITION
'''
def __init__(self, uuid, characteristics, primary=True):
def __init__(self, uuid, characteristics: list[Characteristic], primary=True):
# Convert the uuid to a UUID object if it isn't already
if type(uuid) is str:
uuid = UUID(uuid)
@@ -200,6 +211,14 @@ class Service(Attribute):
self.characteristics = characteristics[:]
self.primary = primary
def get_advertising_data(self):
"""
Get Service specific advertising data
Defined by each Service, default value is empty
:return Service data for advertising
"""
return None
def __str__(self):
return f'Service(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}){"" if self.primary else "*"}'
@@ -254,21 +273,50 @@ class Characteristic(Attribute):
if properties & p
])
def __init__(self, uuid, properties, permissions, value = b'', descriptors = []):
@staticmethod
def string_to_properties(properties_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
properties_str.split(","),
0,
)
def __init__(self, uuid, properties, permissions, value = b'', descriptors: list[Descriptor] = []):
super().__init__(uuid, permissions, value)
self.uuid = self.type
self.properties = properties
if type(properties) is str:
self.properties = Characteristic.string_to_properties(properties)
else:
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors:
if descriptor.uuid == descriptor_type:
if descriptor.type == descriptor_type:
return descriptor
def __str__(self):
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicDeclaration(Attribute):
'''
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
'''
def __init__(self, characteristic, value_handle):
declaration_bytes = struct.pack(
'<BH',
characteristic.properties,
value_handle
) + characteristic.uuid.to_pdu_bytes()
super().__init__(GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes)
self.value_handle = value_handle
self.characteristic = characteristic
def __str__(self):
return f'CharacteristicDeclaration(handle=0x{self.handle:04X}, value_handle=0x{self.value_handle:04X}, uuid={self.characteristic.uuid}, properties={Characteristic.properties_as_string(self.characteristic.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicValue:
'''
@@ -473,3 +521,12 @@ class Descriptor(Attribute):
def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})'
class ClientCharacteristicConfigurationBits(enum.IntFlag):
'''
See Vol 3, Part G - 3.3.3.3 - Table 3.11 Client Characteristic Configuration bit field definition
'''
DEFAULT = 0x0000
NOTIFICATION = 0x0001
INDICATION = 0x0002

View File

@@ -26,19 +26,17 @@
import asyncio
import logging
import struct
from colors import color
from .core import ProtocolError, TimeoutError
from .hci import *
from .att import *
from .gatt import (
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_REQUEST_TIMEOUT,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Characteristic
)
from .core import InvalidStateError, ProtocolError, TimeoutError
from .gatt import (GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, Characteristic,
ClientCharacteristicConfigurationBits)
from .hci import *
# -----------------------------------------------------------------------------
# Logging
@@ -114,7 +112,7 @@ class CharacteristicProxy(AttributeProxy):
async def discover_descriptors(self):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None):
async def subscribe(self, subscriber=None, prefer_notify=True):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
@@ -128,7 +126,7 @@ class CharacteristicProxy(AttributeProxy):
self.subscribers[subscriber] = on_change
subscriber = on_change
return await self.client.subscribe(self, subscriber)
return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
@@ -273,7 +271,7 @@ class Client:
if response.op_code == ATT_ERROR_RESPONSE:
if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
# Unexpected end
logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
# TODO raise appropriate exception
return
break
@@ -337,7 +335,7 @@ class Client:
if response.op_code == ATT_ERROR_RESPONSE:
if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
# Unexpected end
logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
# TODO raise appropriate exception
return
break
@@ -546,7 +544,7 @@ class Client:
return attributes
async def subscribe(self, characteristic, subscriber=None):
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
# If we haven't already discovered the descriptors for this characteristic, do it now
if not characteristic.descriptors_discovered:
await self.discover_descriptors(characteristic)
@@ -557,23 +555,32 @@ class Client:
logger.warning('subscribing to characteristic with no CCCD descriptor')
return
# Set the subscription bits and select the subscriber set
bits = 0
subscriber_sets = []
if characteristic.properties & Characteristic.NOTIFY:
bits |= 0x0001
subscriber_sets.append(self.notification_subscribers.setdefault(characteristic.handle, set()))
if characteristic.properties & Characteristic.INDICATE:
bits |= 0x0002
subscriber_sets.append(self.indication_subscribers.setdefault(characteristic.handle, set()))
if (
characteristic.properties & Characteristic.NOTIFY
and characteristic.properties & Characteristic.INDICATE
):
if prefer_notify:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
else:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
elif characteristic.properties & Characteristic.NOTIFY:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
elif characteristic.properties & Characteristic.INDICATE:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
else:
raise InvalidStateError("characteristic is not notify or indicate")
# Add subscribers to the sets
for subscriber_set in subscriber_sets:
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the characteristic
# emitting an 'update' event when a notification or indication is received
subscriber_set.add(characteristic)
subscriber_set = subscribers.setdefault(characteristic.handle, set())
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the characteristic
# emitting an 'update' event when a notification or indication is received
subscriber_set.add(characteristic)
await self.write_value(cccd, struct.pack('<H', bits), with_response=True)

View File

@@ -26,6 +26,7 @@
import asyncio
import logging
from collections import defaultdict
from typing import Tuple, Optional
from pyee import EventEmitter
from colors import color
@@ -60,12 +61,21 @@ class Server(EventEmitter):
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
self.pending_confirmations = defaultdict(lambda: None)
def __str__(self):
return "\n".join(map(str, self.attributes))
def send_gatt_pdu(self, connection_handle, pdu):
self.device.send_l2cap_pdu(connection_handle, ATT_CID, pdu)
def next_handle(self):
return 1 + len(self.attributes)
def get_advertising_service_data(self):
return {
attribute: data for attribute in self.attributes
if isinstance(attribute, Service) and (data := attribute.get_advertising_data())
}
def get_attribute(self, handle):
attribute = self.attributes_by_handle.get(handle)
if attribute:
@@ -79,6 +89,63 @@ class Server(EventEmitter):
return attribute
return None
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
return next(
(
attribute
for attribute in self.attributes
if attribute.type == GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
and attribute.uuid == service_uuid
),
None,
)
def get_characteristic_attributes(
self, service_uuid: UUID, characteristic_uuid: UUID
) -> Optional[Tuple[CharacteristicDeclaration, Characteristic]]:
service_handle = self.get_service_attribute(service_uuid)
if not service_handle:
return None
return next(
(
(attribute, self.get_attribute(attribute.characteristic.handle))
for attribute in map(
self.get_attribute,
range(service_handle.handle, service_handle.end_group_handle + 1),
)
if attribute.type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
and attribute.characteristic.uuid == characteristic_uuid
),
None,
)
def get_descriptor_attribute(
self, service_uuid: UUID, characteristic_uuid: UUID, descriptor_uuid: UUID
) -> Optional[Descriptor]:
characteristics = self.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
if not characteristics:
return None
(_, characteristic_value) = characteristics
return next(
(
attribute
for attribute in map(
self.get_attribute,
range(
characteristic_value.handle + 1,
characteristic_value.end_group_handle + 1,
),
)
if attribute.type == descriptor_uuid
),
None,
)
def add_attribute(self, attribute):
# Assign a handle to this attribute
attribute.handle = self.next_handle()
@@ -87,7 +154,7 @@ class Server(EventEmitter):
# Add this attribute to the list
self.attributes.append(attribute)
def add_service(self, service):
def add_service(self, service: Service):
# Add the service attribute to the DB
self.add_attribute(service)
@@ -95,16 +162,9 @@ class Server(EventEmitter):
# Add all characteristics
for characteristic in service.characteristics:
# Add a Characteristic Declaration (Vol 3, Part G - 3.3.1 Characteristic Declaration)
declaration_bytes = struct.pack(
'<BH',
characteristic.properties,
self.next_handle() + 1, # The value will be the next attribute after this declaration
) + characteristic.uuid.to_pdu_bytes()
characteristic_declaration = Attribute(
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Attribute.READABLE,
declaration_bytes
# Add a Characteristic Declaration
characteristic_declaration = CharacteristicDeclaration(
characteristic, self.next_handle() + 1
)
self.add_attribute(characteristic_declaration)
@@ -155,7 +215,7 @@ class Server(EventEmitter):
return cccd or bytes([0, 0])
def write_cccd(self, connection, characteristic, value):
logger.debug(f'Subscription update for connection={connection.handle:04X}, handle={characteristic.handle:04X}: {value.hex()}')
logger.debug(f'Subscription update for connection=0x{connection.handle:04X}, handle=0x{characteristic.handle:04X}: {value.hex()}')
# Sanity check
if len(value) != 2:

View File

@@ -1656,6 +1656,14 @@ class Address:
def address_type_name(address_type):
return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type)
@staticmethod
def from_string_for_transport(string, transport):
if transport == BT_BR_EDR_TRANSPORT:
address_type = Address.PUBLIC_DEVICE_ADDRESS
else:
address_type = Address.RANDOM_DEVICE_ADDRESS
return Address(string, address_type)
@staticmethod
def parse_address(data, offset):
# Fix the type to a default value. This is used for parsing type-less Classic addresses
@@ -1696,6 +1704,9 @@ class Address:
self.address_type = address_type
def clone(self):
return Address(self.address_bytes, self.address_type)
@property
def is_public(self):
return self.address_type == self.PUBLIC_DEVICE_ADDRESS or self.address_type == self.PUBLIC_IDENTITY_ADDRESS
@@ -1732,9 +1743,36 @@ class Address:
'''
String representation of the address, MSB first
'''
return ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)])
str = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)])
if not self.is_public:
return str
return str + '/P'
# Predefined address values
Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS)
Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS)
# -----------------------------------------------------------------------------
class OwnAddressType:
PUBLIC = 0
RANDOM = 1
RESOLVABLE_OR_PUBLIC = 2
RESOLVABLE_OR_RANDOM = 3
TYPE_NAMES = {
PUBLIC: 'PUBLIC',
RANDOM: 'RANDOM',
RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC',
RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM'
}
@staticmethod
def type_name(type):
return name_or_number(OwnAddressType.TYPE_NAMES, type)
TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)}
# -----------------------------------------------------------------------------
class HCI_Packet:
'''
@@ -1935,6 +1973,17 @@ class HCI_Accept_Connection_Request_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('bd_addr', Address.parse_address),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name})
])
class HCI_Reject_Connection_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.9 Reject Connection Request Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('bd_addr', Address.parse_address),
@@ -2445,9 +2494,10 @@ class HCI_Write_Voice_Setting_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command()
class HCI_Read_Synchronous_Flow_Control_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.3.36 Write Synchronous Flow Control Enable Command
See Bluetooth spec @ 7.3.36 Read Synchronous Flow Control Enable Command
'''
@@ -2815,7 +2865,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command):
('advertising_interval_min', 2),
('advertising_interval_max', 2),
('advertising_type', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Advertising_Parameters_Command.advertising_type_name(x)}),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_channel_map', 1),
@@ -2894,7 +2944,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command):
('le_scan_type', 1),
('le_scan_interval', 2),
('le_scan_window', 2),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('scanning_filter_policy', 1)
])
class HCI_LE_Set_Scan_Parameters_Command(HCI_Command):
@@ -2928,7 +2978,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('initiator_filter_policy', 1),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('connection_interval_min', 2),
('connection_interval_max', 2),
('max_latency', 2),
@@ -3250,7 +3300,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
('primary_advertising_interval_min', 3),
('primary_advertising_interval_max', 3),
('primary_advertising_channel_map', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(x)}),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1),
@@ -3654,7 +3704,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
initiating_phys_strs = bit_flags_to_strings(self.initiating_phys, HCI_LE_PHY_BIT_NAMES)
fields = [
('initiator_filter_policy:', self.initiator_filter_policy),
('own_address_type: ', Address.address_type_name(self.own_address_type)),
('own_address_type: ', OwnAddressType.type_name(self.own_address_type)),
('peer_address_type: ', Address.address_type_name(self.peer_address_type)),
('peer_address: ', str(self.peer_address)),
('initiating_phys: ', ','.join(initiating_phys_strs)),
@@ -3970,7 +4020,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
super().__init__(self.subevent_code, parameters)
def __str__(self):
reports = '\n'.join([report.to_string(' ') for report in self.reports])
reports = '\n'.join([f'{i}:\n{report.to_string(" ")}' for i, report in enumerate(self.reports)])
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
@@ -4187,7 +4237,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
super().__init__(self.subevent_code, parameters)
def __str__(self):
reports = '\n'.join([report.to_string(' ') for report in self.reports])
reports = '\n'.join([f'{i}:\n{report.to_string(" ")}' for i, report in enumerate(self.reports)])
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
@@ -4822,6 +4872,17 @@ class HCI_Link_Supervision_Timeout_Changed_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('bd_addr', Address.parse_address),
('passkey', 4)
])
class HCI_User_Passkey_Notification_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.48 User Passkey Notification Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('bd_addr', Address.parse_address),

View File

@@ -79,6 +79,8 @@ class Host(EventEmitter):
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None
self.link_key_provider = None
@@ -119,24 +121,44 @@ class Host(EventEmitter):
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
logger.debug(
f'HCI ACL flow control: hc_acl_data_packet_length={self.hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}'
)
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same values
self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = self.hc_total_num_acl_data_packets
logger.debug(
f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}'
)
logger.debug(
f'HCI ACL flow control: hc_acl_data_packet_length={self.hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}'
)
logger.debug(
f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}'
)
if (
response.return_parameters.hc_le_acl_data_packet_length == 0 or
response.return_parameters.hc_total_num_le_acl_data_packets == 0
):
# LE and Classic share the same values
self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = self.hc_total_num_acl_data_packets
if (
self.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND) and
self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND)
):
response = await self.send_command(HCI_LE_Read_Suggested_Default_Data_Length_Command())
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
if (
suggested_max_tx_octets != self.suggested_max_tx_octets or
suggested_max_tx_time != self.suggested_max_tx_time
):
await self.send_command(HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets = self.suggested_max_tx_octets,
suggested_max_tx_time = self.suggested_max_tx_time
))
self.reset_done = True
@@ -176,6 +198,9 @@ class Host(EventEmitter):
if check_result:
if type(response.return_parameters) is int:
status = response.return_parameters
elif type(response.return_parameters) is bytes:
# return parameters first field is a one byte status code
status = response.return_parameters[0]
else:
status = response.return_parameters.status
@@ -344,13 +369,12 @@ class Host(EventEmitter):
# Classic only
def on_hci_connection_request_event(self, event):
# For now, just accept everything
# TODO: delegate the decision
self.send_command_sync(
HCI_Accept_Connection_Request_Command(
bd_addr = event.bd_addr,
role = 0x01 # Remain the peripheral
)
# Notify the listeners
self.emit(
'connection_request',
event.bd_addr,
event.class_of_device,
event.link_type,
)
def on_hci_le_connection_complete_event(self, event):
@@ -383,7 +407,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('connection_failure', event.connection_handle, event.status)
self.emit('connection_failure', BT_LE_TRANSPORT, event.peer_address, event.status)
def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now
@@ -413,7 +437,7 @@ class Host(EventEmitter):
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
# Notify the client
self.emit('connection_failure', event.connection_handle, event.status)
self.emit('connection_failure', BT_BR_EDR_TRANSPORT, event.bd_addr, event.status)
def on_hci_disconnection_complete_event(self, event):
# Find the connection
@@ -575,6 +599,9 @@ class Host(EventEmitter):
def on_hci_simple_pairing_complete_event(self, event):
logger.debug(f'simple pairing complete for {event.bd_addr}: status={HCI_Constant.status_name(event.status)}')
# Notify the client
if event.status == HCI_SUCCESS:
self.emit('ssp_complete', event.bd_addr)
def on_hci_pin_code_request_event(self, event):
# For now, just refuse all requests
@@ -618,6 +645,9 @@ class Host(EventEmitter):
def on_hci_user_passkey_request_event(self, event):
self.emit('authentication_user_passkey_request', event.bd_addr)
def on_hci_user_passkey_notification_event(self, event):
self.emit('authentication_user_passkey_notification', event.bd_addr, event.passkey)
def on_hci_inquiry_complete_event(self, event):
self.emit('inquiry_complete')
@@ -645,3 +675,6 @@ class Host(EventEmitter):
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
self.emit('remote_name', event.bd_addr, event.remote_name)
def on_hci_remote_host_supported_features_notification_event(self, event):
self.emit('remote_host_supported_features', event.bd_addr, event.host_supported_features)

View File

@@ -20,6 +20,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import json
@@ -143,6 +144,10 @@ class KeyStore:
async def get_all(self):
return []
async def delete_all(self):
all_keys = await self.get_all()
await asyncio.gather(*(self.delete(name) for (name, _) in all_keys))
async def get_resolving_keys(self):
all_keys = await self.get_all()
resolving_keys = []
@@ -259,6 +264,13 @@ class JsonKeyStore(KeyStore):
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items()]
async def delete_all(self):
db = await self.load()
db.pop(self.namespace, None)
await self.save(db)
async def get(self, name):
db = await self.load()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,141 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
import logging
from ..core import AdvertisingData
from ..gatt import (
GATT_ASHA_SERVICE,
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
GATT_ASHA_VOLUME_CHARACTERISTIC,
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
PackedCharacteristicAdapter
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class AshaService(TemplateService):
UUID = GATT_ASHA_SERVICE
OPCODE_START = 1
OPCODE_STOP = 2
OPCODE_STATUS = 3
PROTOCOL_VERSION = 0x01
RESERVED_FOR_FUTURE_USE = [00, 00]
FEATURE_MAP = [0x01] # [LE CoC audio output streaming supported]
SUPPORTED_CODEC_ID = [0x02, 0x01] # Codec IDs [G.722 at 16 kHz]
RENDER_DELAY = [00, 00]
def __init__(self, capability: int, hisyncid: [int]):
self.hisyncid = hisyncid
self.capability = capability # Device Capabilities [Left, Monaural]
# Handler for volume control
def on_volume_write(connection, value):
logger.info(f'--- VOLUME Write:{value[0]}')
# Handler for audio control commands
def on_audio_control_point_write(connection, value):
logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == AshaService.OPCODE_START:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
logger.info(
f'### START: codec={value[1]}, audio_type={audio_type}, volume={value[3]}, otherstate={value[4]}')
elif opcode == AshaService.OPCODE_STOP:
logger.info('### STOP')
elif opcode == AshaService.OPCODE_STATUS:
logger.info(f'### STATUS: connected={value[1]}')
# TODO Respond with a status
# asyncio.create_task(device.notify_subscribers(audio_status_characteristic, force=True))
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([
AshaService.PROTOCOL_VERSION, # Version
self.capability,
]) +
bytes(self.hisyncid) +
bytes(AshaService.FEATURE_MAP) +
bytes(AshaService.RENDER_DELAY) +
bytes(AshaService.RESERVED_FOR_FUTURE_USE) +
bytes(AshaService.SUPPORTED_CODEC_ID)
)
self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write)
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([0])
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write)
)
# TODO add real psm value
self.psm = 0x0080
# self.psm = device.register_l2cap_channel_server(0, on_coc, 8)
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
struct.pack('<H', self.psm)
)
characteristics = [self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic]
super().__init__(characteristics)
def get_advertising_data(self):
# Advertisement only uses 4 least significant bytes of the HiSyncId.
return bytes(
AdvertisingData([
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(GATT_ASHA_SERVICE)),
(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(GATT_ASHA_SERVICE) + bytes([
AshaService.PROTOCOL_VERSION,
self.capability,
]) + bytes(self.hisyncid[:4]))
])
)

View File

@@ -21,7 +21,7 @@ import asyncio
from colors import color
from pyee import EventEmitter
from .core import InvalidStateError, ProtocolError, ConnectionError
from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError, ConnectionError
# -----------------------------------------------------------------------------
# Logging
@@ -634,7 +634,12 @@ class Multiplexer(EventEmitter):
if self.state == Multiplexer.OPENING:
self.change_state(Multiplexer.CONNECTED)
if self.open_result:
self.open_result.set_exception(ConnectionError(ConnectionError.CONNECTION_REFUSED))
self.open_result.set_exception(ConnectionError(
ConnectionError.CONNECTION_REFUSED,
BT_BR_EDR_TRANSPORT,
self.l2cap_channel.connection.peer_address,
'rfcomm'
))
else:
logger.warn(f'unexpected state for DM: {self}')

View File

@@ -477,6 +477,9 @@ class PairingDelegate:
async def accept(self):
return True
async def confirm(self):
return True
async def compare_numbers(self, number, digits=6):
return True
@@ -637,15 +640,16 @@ class Session:
self.oob = False
# Set up addresses
self_address = connection.self_address
peer_address = connection.peer_resolvable_address or connection.peer_address
if self.is_initiator:
self.ia = bytes(manager.address)
self.iat = 1 if manager.address.is_random else 0
self.ia = bytes(self_address)
self.iat = 1 if self_address.is_random else 0
self.ra = bytes(peer_address)
self.rat = 1 if peer_address.is_random else 0
else:
self.ra = bytes(manager.address)
self.rat = 1 if manager.address.is_random else 0
self.ra = bytes(self_address)
self.rat = 1 if self_address.is_random else 0
self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0
@@ -715,6 +719,21 @@ class Session:
return False
return True
def prompt_user_for_confirmation(self, next_steps):
async def prompt():
logger.debug('ask for confirmation')
try:
response = await self.pairing_config.delegate.confirm()
if response:
next_steps()
return
except Exception as error:
logger.warn(f'exception while confirm: {error}')
self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
asyncio.create_task(prompt())
def prompt_user_for_numeric_comparison(self, code, next_steps):
async def prompt():
logger.debug(f'verification code: {code}')
@@ -907,8 +926,8 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
addr_type = self.connection.self_address.address_type,
bd_addr = self.connection.self_address
))
# Distribute CSRK
@@ -939,8 +958,8 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
addr_type = self.connection.self_address.address_type,
bd_addr = self.connection.self_address
))
# Distribute CSRK
@@ -1091,7 +1110,7 @@ class Session:
self.manager.on_pairing(self, peer_address, keys)
def on_pairing_failure(self, reason):
logger.warn(f'pairing failure ({error_name(reason)})')
logger.warning(f'pairing failure ({error_name(reason)})')
if self.completed:
return
@@ -1387,12 +1406,12 @@ class Session:
# Compute the 6-digit code
code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
if self.pairing_method == self.NUMERIC_COMPARISON:
# Ask for user confirmation
self.wait_before_continuing = asyncio.get_running_loop().create_future()
self.prompt_user_for_numeric_comparison(code, next_steps)
# Ask for user confirmation
self.wait_before_continuing = asyncio.get_running_loop().create_future()
if self.pairing_method == self.JUST_WORKS:
self.prompt_user_for_confirmation(next_steps)
else:
next_steps()
self.prompt_user_for_numeric_comparison(code, next_steps)
else:
next_steps()
@@ -1486,10 +1505,9 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol
'''
def __init__(self, device, address):
def __init__(self, device):
super().__init__()
self.device = device
self.address = address
self.sessions = {}
self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig()
@@ -1565,7 +1583,7 @@ class Manager(EventEmitter):
asyncio.create_task(store_keys())
# Notify the device
self.device.on_pairing(session.connection.handle, keys)
self.device.on_pairing(session.connection.handle, keys, session.sc)
def on_pairing_failure(self, session, reason):
self.device.on_pairing_failure(session.connection.handle, reason)

View File

@@ -274,7 +274,7 @@ class PumpedPacketSource(ParserSource):
self.terminated.set_result(error)
break
self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
self.pump_task = asyncio.create_task(pump_packets())
def close(self):
if self.pump_task:
@@ -304,7 +304,7 @@ class PumpedPacketSink:
logger.warn(f'exception while sending packet: {error}')
break
self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
self.pump_task = asyncio.create_task(pump_packets())
def close(self):
if self.pump_task:

View File

@@ -414,14 +414,13 @@ async def open_usb_transport(spec):
device = found.open()
# Detach the kernel driver if supported and needed
# Auto-detach the kernel driver if supported
if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
try:
if device.kernelDriverActive(interface):
logger.debug("detaching kernel driver")
device.detachKernelDriver(interface)
except usb1.USBError:
pass
logger.debug('auto-detaching kernel driver')
device.setAutoDetachKernelDriver(True)
except usb1.USBError as error:
logger.warning(f'unable to auto-detach kernel driver: {error}')
# Set the configuration if needed
try:

View File

@@ -18,6 +18,7 @@
import asyncio
import logging
import traceback
import collections
from functools import wraps
from colors import color
from pyee import EventEmitter
@@ -140,3 +141,95 @@ class AsyncRunner:
return wrapper
return decorator
# -----------------------------------------------------------------------------
class FlowControlAsyncPipe:
"""
Asyncio pipe with flow control. When writing to the pipe, the source is
paused (by calling a function passed in when the pipe is created) if the
amount of queued data exceeds a specified threshold.
"""
def __init__(self, pause_source, resume_source, write_to_sink=None, drain_sink=None, threshold=0):
self.pause_source = pause_source
self.resume_source = resume_source
self.write_to_sink = write_to_sink
self.drain_sink = drain_sink
self.threshold = threshold
self.queue = collections.deque() # Queue of packets
self.queued_bytes = 0 # Number of bytes in the queue
self.ready_to_pump = asyncio.Event()
self.paused = False
self.source_paused = False
self.pump_task = None
def start(self):
if self.pump_task is None:
self.pump_task = asyncio.create_task(self.pump())
self.check_pump()
def stop(self):
if self.pump_task is not None:
self.pump_task.cancel()
self.pump_task = None
def write(self, packet):
self.queued_bytes += len(packet)
self.queue.append(packet)
# Pause the source if we're over the threshold
if self.queued_bytes > self.threshold and not self.source_paused:
logger.debug(f'pausing source (queued={self.queued_bytes})')
self.pause_source()
self.source_paused = True
self.check_pump()
def pause(self):
if not self.paused:
self.paused = True
if not self.source_paused:
self.pause_source()
self.source_paused = True
self.check_pump()
def resume(self):
if self.paused:
self.paused = False
if self.source_paused:
self.resume_source()
self.source_paused = False
self.check_pump()
def can_pump(self):
return self.queue and not self.paused and self.write_to_sink is not None
def check_pump(self):
if self.can_pump():
self.ready_to_pump.set()
else:
self.ready_to_pump.clear()
async def pump(self):
while True:
# Wait until we can try to pump packets
await self.ready_to_pump.wait()
# Try to pump a packet
if self.can_pump():
packet = self.queue.pop()
self.write_to_sink(packet)
self.queued_bytes -= len(packet)
# Drain the sink if we can
if self.drain_sink:
await self.drain_sink()
# Check if we can accept more
if self.queued_bytes <= self.threshold and self.source_paused:
logger.debug(f'resuming source (queued={self.queued_bytes})')
self.source_paused = False
self.resume_source()
self.check_pump()

5
examples/asha_sink1.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "Bumble Aid Left",
"address": "F1:F2:F3:F4:F5:F6",
"keystore": "JsonKeyStore"
}

5
examples/asha_sink2.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "Bumble Aid Right",
"address": "F7:F8:F9:FA:FB:FC",
"keystore": "JsonKeyStore"
}

View File

@@ -34,8 +34,8 @@ from bumble.gatt import (
Characteristic,
CharacteristicValue,
GATT_DEVICE_INFORMATION_SERVICE,
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
GATT_DEVICE_BATTERY_SERVICE,
GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
GATT_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_REPORT_CHARACTERISTIC,
@@ -126,8 +126,8 @@ async def keyboard_host(device, peer_address):
connection = await device.connect(peer_address)
await connection.pair()
peer = Peer(connection)
await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
if not hid_services:
print(color('!!! No HID service', 'red'))
return
@@ -221,7 +221,7 @@ async def keyboard_device(device, command):
]
),
Service(
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
[
Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC,
@@ -252,7 +252,7 @@ async def keyboard_device(device, command):
]
),
Service(
GATT_DEVICE_BATTERY_SERVICE,
GATT_BATTERY_SERVICE,
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
@@ -273,7 +273,7 @@ async def keyboard_device(device, command):
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)),
bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
(AdvertisingData.FLAGS, bytes([0x05]))
])

161
examples/run_asha_sink.py Normal file
View File

@@ -0,0 +1,161 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import sys
import os
import logging
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.hci import UUID
from bumble.gatt import (
Service,
Characteristic,
CharacteristicValue
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 4:
print('Usage: python run_asha_sink.py <device-config> <transport-spec> <audio-file>')
print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722')
return
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Handler for audio control commands
def on_audio_control_point_write(connection, value):
print('--- AUDIO CONTROL POINT Write:', value.hex())
opcode = value[0]
if opcode == 1:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
print(f'### START: codec={value[1]}, audio_type={audio_type}, volume={value[3]}, otherstate={value[4]}')
elif opcode == 2:
print('### STOP')
elif opcode == 3:
print(f'### STATUS: connected={value[1]}')
# Respond with a status
asyncio.create_task(device.notify_subscribers(audio_status_characteristic, force=True))
# Handler for volume control
def on_volume_write(connection, value):
print('--- VOLUME Write:', value[0])
# Register an L2CAP CoC server
def on_coc(channel):
def on_data(data):
print('<<< Voice data received:', data.hex())
audio_out.write(data)
channel.sink = on_data
psm = device.register_l2cap_channel_server(0, on_coc, 8)
print(f'### LE_PSM_OUT = {psm}')
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([
0x01, # Version
0x00, # Device Capabilities [Left, Monaural]
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, # HiSyncId
0x01, # Feature Map [LE CoC audio output streaming supported]
0x00, 0x00, # Render Delay
0x00, 0x00, # RFU
0x02, 0x00 # Codec IDs [G.722 at 16 kHz]
])
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write)
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([0])
)
volume_characteristic = Characteristic(
ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write)
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
struct.pack('<H', psm)
)
device.add_service(Service(
ASHA_SERVICE,
[
read_only_properties_characteristic,
audio_control_point_characteristic,
audio_status_characteristic,
volume_characteristic,
le_psm_out_characteristic
]
))
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(device.name, 'utf-8')),
(AdvertisingData.FLAGS, bytes([0x06])),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(ASHA_SERVICE)),
(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(ASHA_SERVICE) + bytes([
0x01, # Protocol Version
0x00, # Capability
0x01, 0x02, 0x03, 0x04 # Truncated HiSyncID
]))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -30,7 +30,7 @@ from bumble.sdp import Client as SDP_Client, SDP_PUBLIC_BROWSE_ROOT, SDP_ALL_ATT
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 3:
print('Usage: run_classic_connect.py <device-config> <transport-spec> <bluetooth-address>')
print('Usage: run_classic_connect.py <device-config> <transport-spec> <bluetooth-addresses..>')
print('example: run_classic_connect.py classic1.json usb:04b4:f901 E1:CA:72:48:C4:E8')
return
@@ -43,8 +43,7 @@ async def main():
device.classic_enabled = True
await device.power_on()
# Connect to a peer
target_address = sys.argv[3]
async def connect(target_address):
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!')
@@ -76,6 +75,10 @@ async def main():
await sdp_client.disconnect()
await hci_source.wait_for_termination()
# Connect to a peer
target_addresses = sys.argv[3:]
await asyncio.wait([asyncio.create_task(connect(target_address)) for target_address in target_addresses])
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -49,11 +49,17 @@ class Listener(Device.Listener, Connection.Listener):
)
# -----------------------------------------------------------------------------
# Alternative way to listen for subscriptions
# -----------------------------------------------------------------------------
def on_my_characteristic_subscription(peer, enabled):
print(f'### My characteristic from {peer}: {"enabled" if enabled else "disabled"}')
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 3:
print('Usage: run_gatt_server.py <device-config> <transport-spec>')
print('example: run_gatt_server.py device1.json usb:0')
print('Usage: run_notifier.py <device-config> <transport-spec>')
print('example: run_notifier.py device1.json usb:0')
return
print('<<< connecting to HCI...')
@@ -83,6 +89,7 @@ async def main():
Characteristic.READABLE,
bytes([0x42])
)
characteristic3.on('subscription', on_my_characteristic_subscription)
custom_service = Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[characteristic1, characteristic2, characteristic3]

View File

@@ -98,6 +98,7 @@ async def list_rfcomm_channels(device, connection):
await sdp_client.disconnect()
# -----------------------------------------------------------------------------
class TcpServerProtocol(asyncio.Protocol):
def __init__(self, rfcomm_session):
@@ -173,7 +174,7 @@ async def main():
print('*** Encryption on')
# Create a client and start it
print('@@@ Starting to RFCOMM client...')
print('@@@ Starting RFCOMM client...')
rfcomm_client = Client(device, connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
@@ -192,7 +193,7 @@ async def main():
if len(sys.argv) == 6:
# A TCP port was specified, start listening
tcp_port = int(sys.argv[5])
asyncio.get_running_loop().create_task(tcp_server(tcp_port, session))
asyncio.create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination()

View File

@@ -51,6 +51,7 @@ console_scripts =
bumble-controller-info = bumble.apps.controller_info:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main
@@ -64,6 +65,8 @@ build =
test =
pytest >= 6.2
pytest-asyncio >= 0.17
pytest-html >= 3.2.0
coverage >= 6.4
development =
invoke >= 1.4
nox >= 2022

View File

@@ -52,8 +52,9 @@ build_tasks.add_task(mkdocs, name="mkdocs")
test_tasks = Collection()
ns.add_collection(test_tasks, name="test")
@task
def test(ctx, filter=None, junit=False, install=False):
@task(incrementable=["verbose"])
def test(ctx, filter=None, junit=False, install=False, html=False, verbose=0):
# Install the package before running the tests
if install:
ctx.run("python -m pip install .[test]")
@@ -62,8 +63,12 @@ def test(ctx, filter=None, junit=False, install=False):
if junit:
args += "--junit-xml test-results.xml"
if filter is not None:
args += " -k '{}'".format(filter)
ctx.run("python -m pytest {} {}".format(os.path.join(ROOT_DIR, "tests"), args))
args += f" -k '{filter}'"
if html:
args += " --html results.html"
if verbose > 0:
args += f" -{'v' * verbose}"
ctx.run(f"python -m pytest {os.path.join(ROOT_DIR, 'tests')} {args}")
test_tasks.add_task(test, default=True)

View File

@@ -15,8 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from bumble.core import AdvertisingData
from bumble.core import AdvertisingData, get_dict_key_by_value
# -----------------------------------------------------------------------------
def test_ad_data():
@@ -24,21 +23,31 @@ def test_ad_data():
ad = AdvertisingData.from_bytes(data)
ad_bytes = bytes(ad)
assert(data == ad_bytes)
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True) == [bytes([123])])
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123])])
data2 = bytes([2, AdvertisingData.TX_POWER_LEVEL, 234])
ad.append(data2)
ad_bytes = bytes(ad)
assert(ad_bytes == data + data2)
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True) == [bytes([123]), bytes([234])])
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])])
# -----------------------------------------------------------------------------
def test_get_dict_key_by_value():
dictionary = {
"A": 1,
"B": 2
}
assert get_dict_key_by_value(dictionary, 1) == "A"
assert get_dict_key_by_value(dictionary, 2) == "B"
assert get_dict_key_by_value(dictionary, 3) is None
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_ad_data()

209
tests/device_test.py Normal file
View File

@@ -0,0 +1,209 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
from types import LambdaType
import pytest
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Connection, Device
from bumble.host import Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_COMMAND_STATUS_PENDING, HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS,
Address, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, HCI_Connection_Request_Event, HCI_Packet
)
from bumble.gatt import GATT_GENERIC_ACCESS_SERVICE, GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, GATT_DEVICE_NAME_CHARACTERISTIC, GATT_APPEARANCE_CHARACTERISTIC
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class Sink:
def __init__(self, flow):
self.flow = flow
next(self.flow)
def on_packet(self, packet):
self.flow.send(packet)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_device_connect_parallel():
d0 = Device(host=Host(None, None))
d1 = Device(host=Host(None, None))
d2 = Device(host=Host(None, None))
# enable classic
d0.classic_enabled = True
d1.classic_enabled = True
d2.classic_enabled = True
# set public addresses
d0.public_address = Address('F0:F1:F2:F3:F4:F5', address_type=Address.PUBLIC_DEVICE_ADDRESS)
d1.public_address = Address('F5:F4:F3:F2:F1:F0', address_type=Address.PUBLIC_DEVICE_ADDRESS)
d2.public_address = Address('F5:F4:F3:F3:F4:F5', address_type=Address.PUBLIC_DEVICE_ADDRESS)
def d0_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_CREATE_CONNECTION_COMMAND'
assert packet.bd_addr == d1.public_address
d0.host.on_hci_packet(HCI_Command_Status_Event(
status = HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets = 1,
command_opcode = HCI_CREATE_CONNECTION_COMMAND
))
d1.host.on_hci_packet(HCI_Connection_Request_Event(
bd_addr = d0.public_address,
class_of_device = 0,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE
))
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_CREATE_CONNECTION_COMMAND'
assert packet.bd_addr == d2.public_address
d0.host.on_hci_packet(HCI_Command_Status_Event(
status = HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets = 1,
command_opcode = HCI_CREATE_CONNECTION_COMMAND
))
d2.host.on_hci_packet(HCI_Connection_Request_Event(
bd_addr = d0.public_address,
class_of_device = 0,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE
))
assert (yield) == None
def d1_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
d1.host.on_hci_packet(HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
return_parameters = b"\x00"
))
d1.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x100,
bd_addr = d0.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
d0.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x100,
bd_addr = d1.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
assert (yield) == None
def d2_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
d2.host.on_hci_packet(HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
return_parameters = b"\x00"
))
d2.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x101,
bd_addr = d0.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
d0.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x101,
bd_addr = d2.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
assert (yield) == None
d0.host.set_packet_sink(Sink(d0_flow()))
d1.host.set_packet_sink(Sink(d1_flow()))
d2.host.set_packet_sink(Sink(d2_flow()))
[c01, c02, a10, a20, a01] = await asyncio.gather(*[
asyncio.create_task(d0.connect(d1.public_address, transport=BT_BR_EDR_TRANSPORT)),
asyncio.create_task(d0.connect(d2.public_address, transport=BT_BR_EDR_TRANSPORT)),
asyncio.create_task(d1.accept(peer_address=d0.public_address)),
asyncio.create_task(d2.accept()),
asyncio.create_task(d0.accept(peer_address=d1.public_address)),
])
assert type(c01) == Connection
assert type(c02) == Connection
assert type(a10) == Connection
assert type(a20) == Connection
assert type(a01) == Connection
assert c01.handle == a10.handle and c01.handle == 0x100
assert c02.handle == a20.handle and c02.handle == 0x101
assert a01 == c01
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))
# there should be one service and two chars, therefore 5 attributes
assert len(device.gatt_server.attributes) == 5
assert device.gatt_server.attributes[0].uuid == GATT_GENERIC_ACCESS_SERVICE
assert device.gatt_server.attributes[1].type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
assert device.gatt_server.attributes[2].uuid == GATT_DEVICE_NAME_CHARACTERISTIC
assert device.gatt_server.attributes[3].type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
assert device.gatt_server.attributes[4].uuid == GATT_APPEARANCE_CHARACTERISTIC
# -----------------------------------------------------------------------------
def test_gatt_services_without_gas():
device = Device(host=Host(None, None), generic_access_service=False)
# there should be no services
assert len(device.gatt_server.attributes) == 0
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run_test_device())

View File

@@ -28,6 +28,7 @@ from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
@@ -226,6 +227,37 @@ async def test_characteristic_encoding():
assert last_change is None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_attribute_getters():
[client, server] = LinkedDevices().devices[:2]
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
service = Service(service_uuid, [characteristic])
server.add_service(service)
service_attr = server.gatt_server.get_service_attribute(service_uuid)
assert service_attr
(char_decl_attr, char_value_attr) = server.gatt_server.get_characteristic_attributes(service_uuid, characteristic_uuid)
assert char_decl_attr and char_value_attr
desc_attr = server.gatt_server.get_descriptor_attribute(service_uuid, characteristic_uuid, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR)
assert desc_attr
# assert all handles are in expected order
assert service_attr.handle < char_decl_attr.handle < char_value_attr.handle < desc_attr.handle == service_attr.end_group_handle
# assert characteristic declarations attribute is followed by characteristic value attribute
assert char_decl_attr.handle + 1 == char_value_attr.handle
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
@@ -612,14 +644,25 @@ async def test_subscribe_notify():
await async_barrier()
assert not c2._called
c3._called = False
c3._called_2 = False
c3._called_3 = False
c3._last_update = None
c3._last_update_2 = None
c3._last_update_3 = None
def on_c3_update(value):
c3._called = True
c3._last_update = value
def on_c3_update_2(value):
def on_c3_update_2(value): # for notify
c3._called_2 = True
c3._last_update_2 = value
def on_c3_update_3(value): # for indicate
c3._called_3 = True
c3._last_update_3 = value
c3.on('update', on_c3_update)
await peer.subscribe(c3, on_c3_update_2)
await async_barrier()
@@ -629,22 +672,33 @@ async def test_subscribe_notify():
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
assert not c3._called_3
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
await async_barrier()
characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert c3._called
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
assert not c3._called_2
assert c3._called_3
assert c3._last_update_3 == characteristic3.value
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert not c3._called
assert not c3._called_2
assert not c3._called_3
# -----------------------------------------------------------------------------
@@ -683,6 +737,55 @@ async def test_mtu_exchange():
assert d2_connection.att_mtu == 50
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert Characteristic.property_name(0x01) == "BROADCAST"
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
# double
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
assert Characteristic.properties_as_string(Characteristic.BROADCAST | Characteristic.READ) == "BROADCAST,READ"
# -----------------------------------------------------------------------------
def test_char_property_string_to_type():
# single
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
# double
assert Characteristic.string_to_properties("BROADCAST,READ") == Characteristic.BROADCAST | Characteristic.READ
assert Characteristic.string_to_properties("READ,BROADCAST") == Characteristic.BROADCAST | Characteristic.READ
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_server_string():
[_, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic]
)
server.add_service(service)
assert str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()

View File

@@ -59,7 +59,7 @@ def test_HCI_LE_Connection_Complete_Event():
# -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55')
address = Address('00:11:22:33:44:55/P')
report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND,

View File

@@ -64,37 +64,37 @@ def test_import():
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
from apps.console import main
assert main
from bumble.apps.controller_info import main
from apps.controller_info import main
assert main
from bumble.apps.controllers import main
from apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
from apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
from apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
from apps.hci_bridge import main
assert main
from bumble.apps.pair import main
from apps.pair import main
assert main
from bumble.apps.scan import main
from apps.scan import main
assert main
from bumble.apps.show import main
from apps.show import main
assert main
from bumble.apps.unbond import main
from apps.unbond import main
assert main
from bumble.apps.usb_probe import main
from apps.usb_probe import main
assert main

284
tests/l2cap_test.py Normal file
View File

@@ -0,0 +1,284 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import random
import pytest
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.device import Device
from bumble.host import Host
from bumble.transport import AsyncPipeSink
from bumble.core import ProtocolError
from bumble.l2cap import (
L2CAP_Connection_Request
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link = self.link),
Controller('C2', link = self.link)
]
self.devices = [
Device(
address = 'F0:F1:F2:F3:F4:F5',
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
),
Device(
address = 'F5:F4:F3:F2:F1:F0',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
)
]
self.paired = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
def on_paired(self, which, keys):
self.paired[which] = keys
# -----------------------------------------------------------------------------
async def setup_connection():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on('connection', lambda connection: two_devices.on_connection(0, connection))
two_devices.devices[1].on('connection', lambda connection: two_devices.on_connection(1, connection))
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
return two_devices
# -----------------------------------------------------------------------------
def test_helpers():
psm = L2CAP_Connection_Request.serialize_psm(0x01)
assert psm == bytes([0x01, 0x00])
psm = L2CAP_Connection_Request.serialize_psm(0x1023)
assert psm == bytes([0x23, 0x10])
psm = L2CAP_Connection_Request.serialize_psm(0x242311)
assert psm == bytes([0x11, 0x23, 0x24])
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x01, 0x00, 0x44]), 1)
assert offset == 3
assert psm == 0x01
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x23, 0x10, 0x44]), 1)
assert offset == 3
assert psm == 0x1023
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1)
assert offset == 4
assert psm == 0x242311
rq = L2CAP_Connection_Request(psm = 0x01, source_cid = 0x44)
brq = bytes(rq)
srq = L2CAP_Connection_Request.from_bytes(brq)
assert srq.psm == rq.psm
assert srq.source_cid == rq.source_cid
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_basic_connection():
devices = await setup_connection()
psm = 1234
# Check that if there's no one listening, we can't connect
with pytest.raises(ProtocolError):
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
# Now add a listener
incoming_channel = None
received = []
def on_coc(channel):
nonlocal incoming_channel
incoming_channel = channel
def on_data(data):
received.append(data)
channel.sink = on_data
devices.devices[1].register_l2cap_channel_server(psm, on_coc)
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
messages = (
bytes([1, 2, 3]),
bytes([4, 5, 6]),
bytes(10000)
)
for message in messages:
l2cap_channel.write(message)
await asyncio.sleep(0)
await l2cap_channel.drain()
# Test closing
closed = [False, False]
closed_event = asyncio.Event()
def on_close(which, event):
closed[which] = True
if event:
event.set()
l2cap_channel.on('close', lambda: on_close(0, None))
incoming_channel.on('close', lambda: on_close(1, closed_event))
await l2cap_channel.disconnect()
assert closed == [True, True]
await closed_event.wait()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert sent_bytes == received_bytes
# -----------------------------------------------------------------------------
async def transfer_payload(max_credits, mtu, mps):
devices = await setup_connection()
received = []
def on_coc(channel):
def on_data(data):
received.append(data)
channel.sink = on_data
psm = devices.devices[1].register_l2cap_channel_server(
psm = 0,
server = on_coc,
max_credits = max_credits,
mtu = mtu,
mps = mps
)
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100, 789)
]
for message in messages:
l2cap_channel.write(message)
await asyncio.sleep(0)
if random.randint(0, 5) == 1:
await l2cap_channel.drain()
await l2cap_channel.drain()
await l2cap_channel.disconnect()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert sent_bytes == received_bytes
@pytest.mark.asyncio
async def test_transfer():
for max_credits in (1, 10, 100, 10000):
for mtu in (50, 255, 256, 1000):
for mps in (50, 255, 256, 1000):
# print(max_credits, mtu, mps)
await transfer_payload(max_credits, mtu, mps)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_bidirectional_transfer():
devices = await setup_connection()
client_received = []
server_received = []
server_channel = None
def on_server_coc(channel):
nonlocal server_channel
server_channel = channel
def on_server_data(data):
server_received.append(data)
channel.sink = on_server_data
def on_client_data(data):
client_received.append(data)
psm = devices.devices[1].register_l2cap_channel_server(psm=0, server=on_server_coc)
client_channel = await devices.connections[0].open_l2cap_channel(psm)
client_channel.sink = on_client_data
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100)
]
for message in messages:
client_channel.write(message)
await client_channel.drain()
await asyncio.sleep(0)
server_channel.write(message)
await server_channel.drain()
await client_channel.disconnect()
message_bytes = b''.join(messages)
client_received_bytes = b''.join(client_received)
server_received_bytes = b''.join(server_received)
assert client_received_bytes == message_bytes
assert server_received_bytes == message_bytes
# -----------------------------------------------------------------------------
async def run():
test_helpers()
await test_basic_connection()
await test_transfer()
await test_bidirectional_transfer()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())