Compare commits

...

80 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
e7e9f9509a update rootcanal version 2024-02-02 20:33:19 -08:00
Gilles Boccon-Gibod
8d2f37aa7a inclusive language 2024-01-28 19:09:39 -08:00
Gilles Boccon-Gibod
b7b70ebcbb address PR comments 2024-01-28 19:09:37 -08:00
Gilles Boccon-Gibod
8ba91f4986 fix assert 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
79a5e953bc comply with limits for certain advertising event types 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
20de5ea250 format 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
bad9ce272c add doc 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
d3273ffa8c format (+3 squashed commits)
Squashed commits:
[60e610f] wip
[eeab73d] wip
[3cdd5b8] basic first pass
2024-01-28 19:02:30 -08:00
zxzxwu
071fc2723a Merge pull request #376 from zxzxwu/host
Manage lifecycle of CIS and SCO links in host
2024-01-28 22:09:08 +08:00
zxzxwu
ef4ea86f58 Merge pull request #381 from zxzxwu/offload
Support non-directed address generation offload
2024-01-28 22:08:32 +08:00
Gilles Boccon-Gibod
dfdaa149d0 Merge pull request #337 from google/gbg/avrcp
Add AVRCP support
2024-01-28 01:27:52 -08:00
Gilles Boccon-Gibod
986343a807 support multiple type checkers for pandora 2024-01-28 01:21:50 -08:00
Gilles Boccon-Gibod
5211d7ba96 revert to older pytest_asyncio 2024-01-28 01:10:31 -08:00
Gilles Boccon-Gibod
a167342778 deal with SupportsBytes for python <= 3.10 2024-01-28 01:04:13 -08:00
Gilles Boccon-Gibod
1efb8cdbee use matrixed python version 2024-01-28 00:34:42 -08:00
Gilles Boccon-Gibod
80d83e6a70 upgrade to mypy 1.8.0 2024-01-28 00:26:50 -08:00
Gilles Boccon-Gibod
31ec1c41ce cleanup 2024-01-28 00:07:31 -08:00
Gilles Boccon-Gibod
aba1ac0cea use a dict instead of a series of ifs (+6 squashed commits)
Squashed commits:
[90f2024] fix import order
[0edd321] add a few docstrings
[77a0ac0] wip
[adcf159] wip
[96cbd67] wip
[d8bfbab] wip (+1 squashed commit)
Squashed commits:
[43b4d66] wip (+2 squashed commits)
Squashed commits:
[3dafaa8] wip
[5844026] wip (+1 squashed commit)
Squashed commits:
[4cbb35a] wip (+1 squashed commit)
Squashed commits:
[4d2b6d3] wip (+4 squashed commits)
Squashed commits:
[f2da510] wip
[318c119] wip
[923b4eb] wip
[9d46365] wip

use a dict instead of a series of ifs (+6 squashed commits)
Squashed commits:
[90f2024] fix import order
[0edd321] add a few docstrings
[77a0ac0] wip
[adcf159] wip
[96cbd67] wip
[d8bfbab] wip
2024-01-27 16:26:17 -08:00
Josh Wu
c40824e51c Support non-directed address generation offload 2024-01-26 16:02:40 +08:00
Gilles Boccon-Gibod
2920f05dae Merge pull request #411 from AlanRosenthal/main
Add bumble-controller-loopback console_scripts
2024-01-24 13:20:19 -08:00
Alan Rosenthal
bc911d6da0 Add bumble-controller-loopback console_scripts 2024-01-24 14:07:35 -05:00
Gilles Boccon-Gibod
4f87f587e4 Merge pull request #409 from google/gbg/root-canal-update
update to rootcanal 1.4
2024-01-22 15:07:20 -08:00
Gilles Boccon-Gibod
3e38ab3638 update to rootcanal 1.4 2024-01-22 12:19:12 -08:00
Gilles Boccon-Gibod
21bb911fea Merge pull request #408 from suneeshs/btbench-update
Update the Bumble BT Bench AndroidManifest.xml
2024-01-22 12:15:35 -08:00
Suneesh Sasikumar
744dfa33a2 Update the Bumble BT Bench AndroidManifest.xml 2024-01-22 13:46:55 -05:00
zxzxwu
ec5f8535a8 Merge pull request #405 from zxzxwu/adv
Make Advertisement dataclass
2024-01-20 11:05:41 +08:00
Gilles Boccon-Gibod
5a83734a00 Merge pull request #388 from google/gbg/scan-with-irk
allow passing IRKs as arguments to scan.py
2024-01-19 11:37:02 -08:00
Josh Wu
b4ae8af3a7 Typing Advertisement 2024-01-19 15:16:24 +08:00
Josh Wu
da60386385 Manage lifecycle of CIS and SCO links in host 2024-01-18 11:56:38 +08:00
zxzxwu
45c4c4f4c5 Merge pull request #404 from zxzxwu/cis
Fix HCI_LE_Set_Host_Feature_Command
2024-01-18 10:56:05 +08:00
zxzxwu
9187c75d68 Merge pull request #397 from zxzxwu/controller
Controller: CIS implementation
2024-01-18 10:55:37 +08:00
zxzxwu
abeec22546 Merge pull request #402 from zxzxwu/key
Save Link Key in CTKD over BR/EDR
2024-01-18 10:55:14 +08:00
Josh Wu
a6bab755cf Fix HCI_LE_Set_Host_Feature_Command 2024-01-17 22:15:15 +08:00
Josh Wu
acd9d994c3 Save link_key in CTKD over BR/EDR
Since keystore.update() overwrites all existing keys, the existing link
key will be wiped out. To avoid this, SMP also need to keep the key.
2024-01-17 19:30:02 +08:00
Gilles Boccon-Gibod
37afda3ed3 Merge pull request #399 from google/gbg/hotfix-002
fix uninitialized variable
2024-01-16 17:29:48 -08:00
Gilles Boccon-Gibod
54f2981267 fix uninitialized variable 2024-01-16 16:49:06 -08:00
Charlie Boutier
bb025514e7 PandoraHost: compute advertising_interval_max with interval_range 2024-01-12 14:36:22 -08:00
Charlie Boutier
e228597269 Pandora host: support advertising interval in advertise 2024-01-12 14:36:22 -08:00
Gilles Boccon-Gibod
95b0d6c6f2 Merge pull request #398 from google/gbg/rfcomm-no-sink
update credits even without a sink
2024-01-11 13:18:15 -08:00
Josh Wu
fa4df6e3a2 Controller: CIS implementation 2024-01-11 01:16:42 +08:00
zxzxwu
46ceea7ecd Merge pull request #391 from zxzxwu/remote_feature
LE read remote features
2024-01-10 15:51:32 +08:00
Gilles Boccon-Gibod
30f89d5739 simplify 2024-01-09 18:01:34 -08:00
Gilles Boccon-Gibod
481cf40831 update credits even without a sink 2024-01-09 17:58:52 -08:00
Josh Wu
eff05afb7a LE read remote features 2024-01-09 11:30:08 +08:00
zxzxwu
d8e6700611 Merge pull request #383 from zxzxwu/controller
Controller: SCO implementation
2024-01-09 09:39:13 +08:00
Gilles Boccon-Gibod
56eb5a933b Merge pull request #394 from google/gbg/hci-latency
add support for HCI latency probing
2024-01-08 09:21:00 -08:00
Gilles Boccon-Gibod
caacc0c133 Merge pull request #395 from google/gbg/loopback-quick-fix
compatibility with recent host ACL property changes
2024-01-08 09:20:45 -08:00
Gilles Boccon-Gibod
5f377c024b format 2024-01-05 12:26:54 -08:00
Gilles Boccon-Gibod
00cd8fbdd0 compatibility with recent host ACL property changes 2024-01-05 12:17:09 -08:00
Gilles Boccon-Gibod
aeeff18428 add support for HCI latency probing 2024-01-05 10:26:04 -08:00
Michael Mogenson
c48e3f5e9c Merge pull request #393 from mogenson/controller-loopback
apps: Add a controller loopback throughput test app
2024-01-05 13:13:30 -05:00
Michael Mogenson
d6bbc1145a apps: Add a controller loopback throughput test app
Add a command line utility to open a transport to a BT controller, put
the controller into local loopback mode, and send and receive ACL data
packets. Record the time it takes to send and receive all packets and
calculate a throughput measurement in kB/s.

This utility is usefull for characterizing the speed of a transport to a
BT controller (such as a TCP socket or serial port) without having to
deal with a connected peer or the variability of over the air
transmissions.

The transport CLI argument is required. The packet size and packet
count arguments are optional. They default to the same values as the
bumble-bench app.
2024-01-05 10:01:24 -05:00
zxzxwu
e2fec67bd9 Merge pull request #390 from zxzxwu/csip
CSIP: Encrypted SIRK implementation
2024-01-04 13:28:23 +08:00
Josh Wu
88cb3b2a4d IWYU in CSIP 2024-01-04 13:22:09 +08:00
zxzxwu
9ebb03be46 Merge pull request #389 from zxzxwu/gitignore
.gitignore: Add venv directories
2024-01-04 12:54:30 +08:00
Gilles Boccon-Gibod
80d84af76c Merge pull request #392 from google/gbg/l2cap-drain
l2cap & rfcomm drain support
2024-01-03 09:59:36 -08:00
Gilles Boccon-Gibod
8f4721758f fix typo 2024-01-03 09:53:17 -08:00
Gilles Boccon-Gibod
8864af4acd format 2024-01-02 11:35:11 -08:00
Gilles Boccon-Gibod
8980fb8cc7 add drain support and a few tool options 2024-01-02 11:07:52 -08:00
Josh Wu
2c5f3472a9 CSIP: Encrypted SIRK implementation 2023-12-30 16:06:42 +08:00
Josh Wu
f18277ac78 Ignore venv directories 2023-12-30 14:23:35 +08:00
Josh Wu
8d46bc04d2 Controller: SCO implementation 2023-12-30 14:22:58 +08:00
Gilles Boccon-Gibod
09e5ea5dec Merge pull request #387 from google/gbg/async-gatt-server
support async read/write for characteristic values
2023-12-29 11:28:22 -08:00
Gilles Boccon-Gibod
d43281c57e allow passing IRKs as arguments 2023-12-28 14:35:23 -08:00
Gilles Boccon-Gibod
6810865670 Merge pull request #385 from google/gbg/android-enable-dle
request MTU change after connection
2023-12-28 13:46:25 -08:00
Gilles Boccon-Gibod
3e9e06a02c Merge pull request #386 from AlanRosenthal/main
app/bench.py: use logging rather than print()
2023-12-28 13:42:17 -08:00
Alan Rosenthal
ccd12f6591 app/bench.py: use logging rather than print() 2023-12-28 16:06:50 -05:00
Gilles Boccon-Gibod
f9a7843f7e request MTU change after connection 2023-12-28 11:17:18 -08:00
Gilles Boccon-Gibod
210c334db7 Merge pull request #380 from google/gbg/classic-buffer-size
support per-transport ACL queues
2023-12-28 09:24:52 -08:00
Gilles Boccon-Gibod
f297cdfcce Merge pull request #384 from eukub/string-concatination-to-fstring
сhanged concatenation of strings to f-strings to improve readability
2023-12-28 09:24:25 -08:00
eukub
5b536d00ab сhanged concatenation of strings to f-strings to improve readability and unify with the rest of code 2023-12-28 16:27:36 +03:00
Gilles Boccon-Gibod
b4af46ebd5 use TCP_NODELAY on socket 2023-12-27 12:11:20 -08:00
Gilles Boccon-Gibod
c08da3193e format 2023-12-27 11:56:06 -08:00
Gilles Boccon-Gibod
f2925ca647 support async read/write for characteristic values 2023-12-27 11:52:22 -08:00
Gilles Boccon-Gibod
fd4d68e5c0 print controller flow control info 2023-12-26 13:24:24 -08:00
Gilles Boccon-Gibod
b90d0f8710 fix tests 2023-12-26 09:09:20 -08:00
Gilles Boccon-Gibod
afc6d19e04 address PR comments 2023-12-23 14:21:44 -08:00
Gilles Boccon-Gibod
c05f073b33 Update bumble/host.py
Co-authored-by: zxzxwu <92432172+zxzxwu@users.noreply.github.com>
2023-12-23 14:15:53 -08:00
Gilles Boccon-Gibod
2b4c2a22f4 format 2023-12-22 14:22:08 -08:00
Gilles Boccon-Gibod
47fe93a148 support per-transport ACL queues 2023-12-22 13:52:33 -08:00
65 changed files with 7333 additions and 1448 deletions

View File

@@ -29,7 +29,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.10'
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip

2
.gitignore vendored
View File

@@ -10,3 +10,5 @@ __pycache__
bumble/_version.py
.vscode/launch.json
/.idea
venv/
.venv/

View File

@@ -12,7 +12,9 @@
"ASHA",
"asyncio",
"ATRAC",
"avctp",
"avdtp",
"avrcp",
"bitpool",
"bitstruct",
"BSCP",
@@ -22,6 +24,7 @@
"cmac",
"CONNECTIONLESS",
"csip",
"csis",
"csrcs",
"CVSD",
"datagram",
@@ -32,6 +35,7 @@
"dhkey",
"diversifier",
"endianness",
"ESCO",
"Fitbit",
"GATTLINK",
"HANDSFREE",

File diff suppressed because it is too large Load Diff

View File

@@ -777,7 +777,7 @@ class ConsoleApp:
if not service:
continue
values = [
attribute.read_value(connection)
await attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
@@ -796,11 +796,11 @@ class ConsoleApp:
if not characteristic:
continue
values = [
attribute.read_value(connection)
await attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
values = [await attribute.read_value(None)]
# TODO: future optimization: convert CCCD value to human readable string
@@ -944,7 +944,7 @@ class ConsoleApp:
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
await attribute.write_value(None, value)
if attribute.has_properties(Characteristic.NOTIFY):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties(Characteristic.INDICATE):

View File

@@ -18,24 +18,30 @@
import asyncio
import os
import logging
import click
from bumble.company_ids import COMPANY_IDENTIFIERS
import time
import click
from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.colors import color
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
LeFeatureMask,
HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_Read_Buffer_Size_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
@@ -44,6 +50,7 @@ from bumble.hci import (
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_Read_Local_Version_Information_Command,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
@@ -59,7 +66,7 @@ def command_succeeded(response):
# -----------------------------------------------------------------------------
async def get_classic_info(host):
async def get_classic_info(host: Host) -> None:
if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
response = await host.send_command(HCI_Read_BD_ADDR_Command())
if command_succeeded(response):
@@ -80,7 +87,7 @@ async def get_classic_info(host):
# -----------------------------------------------------------------------------
async def get_le_info(host):
async def get_le_info(host: Host) -> None:
print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
@@ -133,11 +140,36 @@ async def get_le_info(host):
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
print(LeFeatureMask(feature).name)
# -----------------------------------------------------------------------------
async def async_main(transport):
async def get_acl_flow_control_info(host: Host) -> None:
print()
if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_Read_Buffer_Size_Command(), check_result=True
)
print(
color('ACL Flow Control:', 'yellow'),
f'{response.return_parameters.hc_total_num_acl_data_packets} '
f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
)
if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.hc_total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}',
)
# -----------------------------------------------------------------------------
async def async_main(latency_probes, transport):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
print('<<< connected')
@@ -145,6 +177,23 @@ async def async_main(transport):
host = Host(hci_source, hci_sink)
await host.reset()
# Measure the latency if requested
latencies = []
if latency_probes:
for _ in range(latency_probes):
start = time.time()
await host.send_command(HCI_Read_Local_Version_Information_Command())
latencies.append(1000 * (time.time() - start))
print(
color('HCI Command Latency:', 'yellow'),
(
f'min={min(latencies):.2f}, '
f'max={max(latencies):.2f}, '
f'average={sum(latencies)/len(latencies):.2f}'
),
'\n',
)
# Print version
print(color('Version:', 'yellow'))
print(
@@ -168,6 +217,9 @@ async def async_main(transport):
# Get the LE info
await get_le_info(host)
# Print the ACL flow control info
await get_acl_flow_control_info(host)
# Print the list of commands supported by the controller
print()
print(color('Supported Commands:', 'yellow'))
@@ -177,10 +229,16 @@ async def async_main(transport):
# -----------------------------------------------------------------------------
@click.command()
@click.option(
'--latency-probes',
metavar='N',
type=int,
help='Send N commands to measure HCI transport latency statistics',
)
@click.argument('transport')
def main(transport):
def main(latency_probes, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(async_main(transport))
asyncio.run(async_main(latency_probes, transport))
# -----------------------------------------------------------------------------

200
apps/controller_loopback.py Normal file
View File

@@ -0,0 +1,200 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
from bumble.colors import color
from bumble.hci import (
HCI_READ_LOOPBACK_MODE_COMMAND,
HCI_Read_Loopback_Mode_Command,
HCI_WRITE_LOOPBACK_MODE_COMMAND,
HCI_Write_Loopback_Mode_Command,
LoopbackMode,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
import click
class Loopback:
"""Send and receive ACL data packets in local loopback mode"""
def __init__(self, packet_size: int, packet_count: int, transport: str):
self.transport = transport
self.packet_size = packet_size
self.packet_count = packet_count
self.connection_handle: Optional[int] = None
self.connection_event = asyncio.Event()
self.done = asyncio.Event()
self.expected_cid = 0
self.bytes_received = 0
self.start_timestamp = 0.0
self.last_timestamp = 0.0
def on_connection(self, connection_handle: int, *args):
"""Retrieve connection handle from new connection event"""
if not self.connection_event.is_set():
# save first connection handle for ACL
# subsequent connections are SCO
self.connection_handle = connection_handle
self.connection_event.set()
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
"""Calculate packet receive speed"""
now = time.time()
print(f'<<< Received packet {cid}: {len(pdu)} bytes')
assert connection_handle == self.connection_handle
assert cid == self.expected_cid
self.expected_cid += 1
if cid == 0:
self.start_timestamp = now
else:
elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(pdu)
instant_rx_speed = len(pdu) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start
print(
color(
f'@@@ RX speed: instant={instant_rx_speed:.4f},'
f' average={average_rx_speed:.4f}',
'cyan',
)
)
self.last_timestamp = now
if self.expected_cid == self.packet_count:
print(color('@@@ Received last packet', 'green'))
self.done.set()
async def run(self):
"""Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
hci_source,
hci_sink,
):
print(color('>>> Connected', 'green'))
host = Host(hci_source, hci_sink)
await host.reset()
# make sure data can fit in one l2cap pdu
l2cap_header_size = 4
max_packet_size = host.acl_packet_queue.max_packet_size - l2cap_header_size
if self.packet_size > max_packet_size:
print(
color(
f'!!! Packet size ({self.packet_size}) larger than max supported'
f' size ({max_packet_size})',
'red',
)
)
return
if not host.supports_command(
HCI_WRITE_LOOPBACK_MODE_COMMAND
) or not host.supports_command(HCI_READ_LOOPBACK_MODE_COMMAND):
print(color('!!! Loopback mode not supported', 'red'))
return
# set event callbacks
host.on('connection', self.on_connection)
host.on('l2cap_pdu', self.on_l2cap_pdu)
loopback_mode = LoopbackMode.LOCAL
print(color('### Setting loopback mode', 'blue'))
await host.send_command(
HCI_Write_Loopback_Mode_Command(loopback_mode=LoopbackMode.LOCAL),
check_result=True,
)
print(color('### Checking loopback mode', 'blue'))
response = await host.send_command(
HCI_Read_Loopback_Mode_Command(), check_result=True
)
if response.return_parameters.loopback_mode != loopback_mode:
print(color('!!! Loopback mode mismatch', 'red'))
return
await self.connection_event.wait()
print(color('### Connected', 'cyan'))
print(color('=== Start sending', 'magenta'))
start_time = time.time()
bytes_sent = 0
for cid in range(0, self.packet_count):
# using the cid as an incremental index
host.send_l2cap_pdu(
self.connection_handle, cid, bytes(self.packet_size)
)
print(
color(
f'>>> Sending packet {cid}: {self.packet_size} bytes', 'yellow'
)
)
bytes_sent += self.packet_size # don't count L2CAP or HCI header sizes
await asyncio.sleep(0) # yield to allow packet receive
await self.done.wait()
print(color('=== Done!', 'magenta'))
elapsed = time.time() - start_time
average_tx_speed = bytes_sent / elapsed
print(
color(
f'@@@ TX speed: average={average_tx_speed:.4f} ({bytes_sent} bytes'
f' in {elapsed:.2f} seconds)',
'green',
)
)
# -----------------------------------------------------------------------------
@click.command()
@click.option(
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 4096),
default=500,
help='Packet size',
)
@click.option(
'--packet-count',
'-c',
metavar='COUNT',
type=int,
default=10,
help='Packet count',
)
@click.argument('transport')
def main(packet_size, packet_count, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
loopback = Loopback(packet_size, packet_count, transport)
asyncio.run(loopback.run())
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

View File

@@ -49,14 +49,16 @@ class ServerBridge:
self.tcp_port = tcp_port
async def start(self, device: Device) -> None:
# Listen for incoming L2CAP CoC connections
# Listen for incoming L2CAP channel connections
device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(
psm=self.psm, mtu=self.mtu, mps=self.mps, max_credits=self.max_credits
),
handler=self.on_coc,
handler=self.on_channel,
)
print(
color(f'### Listening for channel connection on PSM {self.psm}', 'yellow')
)
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection):
def on_ble_disconnection(reason):
@@ -73,7 +75,7 @@ class ServerBridge:
await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel):
def on_channel(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe:
@@ -83,7 +85,7 @@ class ServerBridge:
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu
l2cap_channel.sink = self.on_channel_sdu
async def connect_to_tcp(self):
# Connect to the TCP server
@@ -128,7 +130,7 @@ class ServerBridge:
if self.tcp_transport is not None:
self.tcp_transport.close()
def on_coc_sdu(self, sdu):
def on_channel_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red'))
@@ -183,7 +185,7 @@ class ClientBridge:
peer_name = writer.get_extra_info('peer_name')
print(color(f'<<< TCP connection from {peer_name}', 'magenta'))
def on_coc_sdu(sdu):
def on_channel_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu)
@@ -209,7 +211,7 @@ class ClientBridge:
writer.close()
return
l2cap_channel.sink = on_coc_sdu
l2cap_channel.sink = on_channel_sdu
l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP
@@ -274,23 +276,29 @@ async def run(device_config, hci_transport, bridge):
@click.pass_context
@click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234)
@click.option('--psm', help='PSM for L2CAP', type=int, default=1234)
@click.option(
'--l2cap-coc-max-credits',
help='Maximum L2CAP CoC Credits',
'--l2cap-max-credits',
help='Maximum L2CAP Credits',
type=click.IntRange(1, 65535),
default=128,
)
@click.option(
'--l2cap-coc-mtu',
help='L2CAP CoC MTU',
type=click.IntRange(23, 65535),
default=1022,
'--l2cap-mtu',
help='L2CAP MTU',
type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU,
),
default=1024,
)
@click.option(
'--l2cap-coc-mps',
help='L2CAP CoC MPS',
type=click.IntRange(23, 65533),
'--l2cap-mps',
help='L2CAP MPS',
type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS,
),
default=1024,
)
def cli(
@@ -298,17 +306,17 @@ def cli(
device_config,
hci_transport,
psm,
l2cap_coc_max_credits,
l2cap_coc_mtu,
l2cap_coc_mps,
l2cap_max_credits,
l2cap_mtu,
l2cap_mps,
):
context.ensure_object(dict)
context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits
context.obj['mtu'] = l2cap_coc_mtu
context.obj['mps'] = l2cap_coc_mps
context.obj['max_credits'] = l2cap_max_credits
context.obj['mtu'] = l2cap_mtu
context.obj['mps'] = l2cap_mps
# -----------------------------------------------------------------------------

View File

@@ -26,7 +26,7 @@ from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
from bumble.device import Advertisement
from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
from bumble.hci import Address, HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# -----------------------------------------------------------------------------
@@ -66,10 +66,15 @@ class AdvertisementPrinter:
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
address.address_type
]
if address.is_public:
type_color = 'cyan'
if address.address_type in (
Address.RANDOM_IDENTITY_ADDRESS,
Address.PUBLIC_IDENTITY_ADDRESS,
):
type_color = 'yellow'
else:
if address.is_static:
if address.is_public:
type_color = 'cyan'
elif address.is_static:
type_color = 'green'
address_qualifier = '(static)'
elif address.is_resolvable:
@@ -116,6 +121,7 @@ async def scan(
phy,
filter_duplicates,
raw,
irks,
keystore_file,
device_config,
transport,
@@ -140,9 +146,21 @@ async def scan(
if device.keystore:
resolving_keys = await device.keystore.get_resolving_keys()
resolver = AddressResolver(resolving_keys)
else:
resolver = None
resolving_keys = []
for irk_and_address in irks:
if ':' not in irk_and_address:
raise ValueError('invalid IRK:ADDRESS value')
irk_hex, address_str = irk_and_address.split(':', 1)
resolving_keys.append(
(
bytes.fromhex(irk_hex),
Address(address_str, Address.RANDOM_DEVICE_ADDRESS),
)
)
resolver = AddressResolver(resolving_keys) if resolving_keys else None
printer = AdvertisementPrinter(min_rssi, resolver)
if raw:
@@ -187,8 +205,24 @@ async def scan(
default=False,
help='Listen for raw advertising reports instead of processed ones',
)
@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device')
@click.option(
'--irk',
metavar='<IRK_HEX>:<ADDRESS>',
help=(
'Use this IRK for resolving private addresses ' '(may be used more than once)'
),
multiple=True,
)
@click.option(
'--keystore-file',
metavar='FILE_PATH',
help='Keystore file to use when resolving addresses',
)
@click.option(
'--device-config',
metavar='FILE_PATH',
help='Device config file for the scanning device',
)
@click.argument('transport')
def main(
min_rssi,
@@ -198,6 +232,7 @@ def main(
phy,
filter_duplicates,
raw,
irk,
keystore_file,
device_config,
transport,
@@ -212,6 +247,7 @@ def main(
phy,
filter_duplicates,
raw,
irk,
keystore_file,
device_config,
transport,

View File

@@ -184,8 +184,12 @@ def make_audio_source_service_sdp_records(service_record_handle, version=(1, 3))
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
]
)
]
),
),
@@ -234,8 +238,12 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)):
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
]
)
]
),
),

View File

@@ -25,9 +25,21 @@
from __future__ import annotations
import enum
import functools
import inspect
import struct
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Type,
Union,
TYPE_CHECKING,
)
from pyee import EventEmitter
from typing import Dict, Type, List, Protocol, Union, Optional, Any, TYPE_CHECKING
from bumble.core import UUID, name_or_number, ProtocolError
from bumble.hci import HCI_Object, key_with_value
@@ -722,12 +734,38 @@ class ATT_Handle_Value_Confirmation(ATT_PDU):
# -----------------------------------------------------------------------------
class ConnectionValue(Protocol):
def read(self, connection) -> bytes:
...
class AttributeValue:
'''
Attribute value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
'''
def write(self, connection, value: bytes) -> None:
...
def __init__(
self,
read: Union[
Callable[[Optional[Connection]], bytes],
Callable[[Optional[Connection]], Awaitable[bytes]],
None,
] = None,
write: Union[
Callable[[Optional[Connection], bytes], None],
Callable[[Optional[Connection], bytes], Awaitable[None]],
None,
] = None,
):
self._read = read
self._write = write
def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]:
return self._read(connection) if self._read else b''
def write(
self, connection: Optional[Connection], value: bytes
) -> Union[Awaitable[None], None]:
if self._write:
return self._write(connection, value)
return None
# -----------------------------------------------------------------------------
@@ -770,13 +808,13 @@ class Attribute(EventEmitter):
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
value: Union[str, bytes, ConnectionValue]
value: Union[bytes, AttributeValue]
def __init__(
self,
attribute_type: Union[str, bytes, UUID],
permissions: Union[str, Attribute.Permissions],
value: Union[str, bytes, ConnectionValue] = b'',
value: Union[str, bytes, AttributeValue] = b'',
) -> None:
EventEmitter.__init__(self)
self.handle = 0
@@ -806,7 +844,7 @@ class Attribute(EventEmitter):
def decode_value(self, value_bytes: bytes) -> Any:
return value_bytes
def read_value(self, connection: Optional[Connection]) -> bytes:
async def read_value(self, connection: Optional[Connection]) -> bytes:
if (
(self.permissions & self.READ_REQUIRES_ENCRYPTION)
and connection is not None
@@ -832,6 +870,8 @@ class Attribute(EventEmitter):
if hasattr(self.value, 'read'):
try:
value = self.value.read(connection)
if inspect.isawaitable(value):
value = await value
except ATT_Error as error:
raise ATT_Error(
error_code=error.error_code, att_handle=self.handle
@@ -841,7 +881,7 @@ class Attribute(EventEmitter):
return self.encode_value(value)
def write_value(self, connection: Connection, value_bytes: bytes) -> None:
async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
if (
self.permissions & self.WRITE_REQUIRES_ENCRYPTION
) and not connection.encryption:
@@ -864,7 +904,9 @@ class Attribute(EventEmitter):
if hasattr(self.value, 'write'):
try:
self.value.write(connection, value) # pylint: disable=not-callable
result = self.value.write(connection, value)
if inspect.isawaitable(result):
await result
except ATT_Error as error:
raise ATT_Error(
error_code=error.error_code, att_handle=self.handle

520
bumble/avc.py Normal file
View File

@@ -0,0 +1,520 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import Dict, Type, Union, Tuple
from bumble.utils import OpenIntEnum
# -----------------------------------------------------------------------------
class Frame:
class SubunitType(enum.IntEnum):
# AV/C Digital Interface Command Set General Specification Version 4.1
# Table 7.4
MONITOR = 0x00
AUDIO = 0x01
PRINTER = 0x02
DISC = 0x03
TAPE_RECORDER_OR_PLAYER = 0x04
TUNER = 0x05
CA = 0x06
CAMERA = 0x07
PANEL = 0x09
BULLETIN_BOARD = 0x0A
VENDOR_UNIQUE = 0x1C
EXTENDED = 0x1E
UNIT = 0x1F
class OperationCode(OpenIntEnum):
# 0x00 - 0x0F: Unit and subunit commands
VENDOR_DEPENDENT = 0x00
RESERVE = 0x01
PLUG_INFO = 0x02
# 0x10 - 0x3F: Unit commands
DIGITAL_OUTPUT = 0x10
DIGITAL_INPUT = 0x11
CHANNEL_USAGE = 0x12
OUTPUT_PLUG_SIGNAL_FORMAT = 0x18
INPUT_PLUG_SIGNAL_FORMAT = 0x19
GENERAL_BUS_SETUP = 0x1F
CONNECT_AV = 0x20
DISCONNECT_AV = 0x21
CONNECTIONS = 0x22
CONNECT = 0x24
DISCONNECT = 0x25
UNIT_INFO = 0x30
SUBUNIT_INFO = 0x31
# 0x40 - 0x7F: Subunit commands
PASS_THROUGH = 0x7C
GUI_UPDATE = 0x7D
PUSH_GUI_DATA = 0x7E
USER_ACTION = 0x7F
# 0xA0 - 0xBF: Unit and subunit commands
VERSION = 0xB0
POWER = 0xB2
subunit_type: SubunitType
subunit_id: int
opcode: OperationCode
operands: bytes
@staticmethod
def subclass(subclass):
# Infer the opcode from the class name
if subclass.__name__.endswith("CommandFrame"):
short_name = subclass.__name__.replace("CommandFrame", "")
category_class = CommandFrame
elif subclass.__name__.endswith("ResponseFrame"):
short_name = subclass.__name__.replace("ResponseFrame", "")
category_class = ResponseFrame
else:
raise ValueError(f"invalid subclass name {subclass.__name__}")
uppercase_indexes = [
i for i in range(len(short_name)) if short_name[i].isupper()
]
uppercase_indexes.append(len(short_name))
words = [
short_name[uppercase_indexes[i] : uppercase_indexes[i + 1]].upper()
for i in range(len(uppercase_indexes) - 1)
]
opcode_name = "_".join(words)
opcode = Frame.OperationCode[opcode_name]
category_class.subclasses[opcode] = subclass
return subclass
@staticmethod
def from_bytes(data: bytes) -> Frame:
if data[0] >> 4 != 0:
raise ValueError("first 4 bits must be 0s")
ctype_or_response = data[0] & 0xF
subunit_type = Frame.SubunitType(data[1] >> 3)
subunit_id = data[1] & 7
if subunit_type == Frame.SubunitType.EXTENDED:
# Not supported
raise NotImplementedError("extended subunit types not supported")
if subunit_id < 5:
opcode_offset = 2
elif subunit_id == 5:
# Extended to the next byte
extension = data[2]
if extension == 0:
raise ValueError("extended subunit ID value reserved")
if extension == 0xFF:
subunit_id = 5 + 254 + data[3]
opcode_offset = 4
else:
subunit_id = 5 + extension
opcode_offset = 3
elif subunit_id == 6:
raise ValueError("reserved subunit ID")
opcode = Frame.OperationCode(data[opcode_offset])
operands = data[opcode_offset + 1 :]
# Look for a registered subclass
if ctype_or_response < 8:
# Command
ctype = CommandFrame.CommandType(ctype_or_response)
if c_subclass := CommandFrame.subclasses.get(opcode):
return c_subclass(
ctype,
subunit_type,
subunit_id,
*c_subclass.parse_operands(operands),
)
return CommandFrame(ctype, subunit_type, subunit_id, opcode, operands)
else:
# Response
response = ResponseFrame.ResponseCode(ctype_or_response)
if r_subclass := ResponseFrame.subclasses.get(opcode):
return r_subclass(
response,
subunit_type,
subunit_id,
*r_subclass.parse_operands(operands),
)
return ResponseFrame(response, subunit_type, subunit_id, opcode, operands)
def to_bytes(
self,
ctype_or_response: Union[CommandFrame.CommandType, ResponseFrame.ResponseCode],
) -> bytes:
# TODO: support extended subunit types and ids.
return (
bytes(
[
ctype_or_response,
self.subunit_type << 3 | self.subunit_id,
self.opcode,
]
)
+ self.operands
)
def to_string(self, extra: str) -> str:
return (
f"{self.__class__.__name__}({extra}"
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"opcode={self.opcode.name}, "
f"operands={self.operands.hex()})"
)
def __init__(
self,
subunit_type: SubunitType,
subunit_id: int,
opcode: OperationCode,
operands: bytes,
) -> None:
self.subunit_type = subunit_type
self.subunit_id = subunit_id
self.opcode = opcode
self.operands = operands
# -----------------------------------------------------------------------------
class CommandFrame(Frame):
class CommandType(OpenIntEnum):
# AV/C Digital Interface Command Set General Specification Version 4.1
# Table 7.1
CONTROL = 0x00
STATUS = 0x01
SPECIFIC_INQUIRY = 0x02
NOTIFY = 0x03
GENERAL_INQUIRY = 0x04
subclasses: Dict[Frame.OperationCode, Type[CommandFrame]] = {}
ctype: CommandType
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
raise NotImplementedError
def __init__(
self,
ctype: CommandType,
subunit_type: Frame.SubunitType,
subunit_id: int,
opcode: Frame.OperationCode,
operands: bytes,
) -> None:
super().__init__(subunit_type, subunit_id, opcode, operands)
self.ctype = ctype
def __bytes__(self):
return self.to_bytes(self.ctype)
def __str__(self):
return self.to_string(f"ctype={self.ctype.name}, ")
# -----------------------------------------------------------------------------
class ResponseFrame(Frame):
class ResponseCode(OpenIntEnum):
# AV/C Digital Interface Command Set General Specification Version 4.1
# Table 7.2
NOT_IMPLEMENTED = 0x08
ACCEPTED = 0x09
REJECTED = 0x0A
IN_TRANSITION = 0x0B
IMPLEMENTED_OR_STABLE = 0x0C
CHANGED = 0x0D
INTERIM = 0x0F
subclasses: Dict[Frame.OperationCode, Type[ResponseFrame]] = {}
response: ResponseCode
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
raise NotImplementedError
def __init__(
self,
response: ResponseCode,
subunit_type: Frame.SubunitType,
subunit_id: int,
opcode: Frame.OperationCode,
operands: bytes,
) -> None:
super().__init__(subunit_type, subunit_id, opcode, operands)
self.response = response
def __bytes__(self):
return self.to_bytes(self.response)
def __str__(self):
return self.to_string(f"response={self.response.name}, ")
# -----------------------------------------------------------------------------
class VendorDependentFrame:
company_id: int
vendor_dependent_data: bytes
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
return (
struct.unpack(">I", b"\x00" + operands[:3])[0],
operands[3:],
)
def make_operands(self) -> bytes:
return struct.pack(">I", self.company_id)[1:] + self.vendor_dependent_data
def __init__(self, company_id: int, vendor_dependent_data: bytes):
self.company_id = company_id
self.vendor_dependent_data = vendor_dependent_data
# -----------------------------------------------------------------------------
@Frame.subclass
class VendorDependentCommandFrame(VendorDependentFrame, CommandFrame):
def __init__(
self,
ctype: CommandFrame.CommandType,
subunit_type: Frame.SubunitType,
subunit_id: int,
company_id: int,
vendor_dependent_data: bytes,
) -> None:
VendorDependentFrame.__init__(self, company_id, vendor_dependent_data)
CommandFrame.__init__(
self,
ctype,
subunit_type,
subunit_id,
Frame.OperationCode.VENDOR_DEPENDENT,
self.make_operands(),
)
def __str__(self):
return (
f"VendorDependentCommandFrame(ctype={self.ctype.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"company_id=0x{self.company_id:06X}, "
f"vendor_dependent_data={self.vendor_dependent_data.hex()})"
)
# -----------------------------------------------------------------------------
@Frame.subclass
class VendorDependentResponseFrame(VendorDependentFrame, ResponseFrame):
def __init__(
self,
response: ResponseFrame.ResponseCode,
subunit_type: Frame.SubunitType,
subunit_id: int,
company_id: int,
vendor_dependent_data: bytes,
) -> None:
VendorDependentFrame.__init__(self, company_id, vendor_dependent_data)
ResponseFrame.__init__(
self,
response,
subunit_type,
subunit_id,
Frame.OperationCode.VENDOR_DEPENDENT,
self.make_operands(),
)
def __str__(self):
return (
f"VendorDependentResponseFrame(response={self.response.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"company_id=0x{self.company_id:06X}, "
f"vendor_dependent_data={self.vendor_dependent_data.hex()})"
)
# -----------------------------------------------------------------------------
class PassThroughFrame:
"""
See AV/C Panel Subunit Specification 1.1 - 9.4 PASS THROUGH control command
"""
class StateFlag(enum.IntEnum):
PRESSED = 0
RELEASED = 1
class OperationId(OpenIntEnum):
SELECT = 0x00
UP = 0x01
DOWN = 0x01
LEFT = 0x03
RIGHT = 0x04
RIGHT_UP = 0x05
RIGHT_DOWN = 0x06
LEFT_UP = 0x07
LEFT_DOWN = 0x08
ROOT_MENU = 0x09
SETUP_MENU = 0x0A
CONTENTS_MENU = 0x0B
FAVORITE_MENU = 0x0C
EXIT = 0x0D
NUMBER_0 = 0x20
NUMBER_1 = 0x21
NUMBER_2 = 0x22
NUMBER_3 = 0x23
NUMBER_4 = 0x24
NUMBER_5 = 0x25
NUMBER_6 = 0x26
NUMBER_7 = 0x27
NUMBER_8 = 0x28
NUMBER_9 = 0x29
DOT = 0x2A
ENTER = 0x2B
CLEAR = 0x2C
CHANNEL_UP = 0x30
CHANNEL_DOWN = 0x31
PREVIOUS_CHANNEL = 0x32
SOUND_SELECT = 0x33
INPUT_SELECT = 0x34
DISPLAY_INFORMATION = 0x35
HELP = 0x36
PAGE_UP = 0x37
PAGE_DOWN = 0x38
POWER = 0x40
VOLUME_UP = 0x41
VOLUME_DOWN = 0x42
MUTE = 0x43
PLAY = 0x44
STOP = 0x45
PAUSE = 0x46
RECORD = 0x47
REWIND = 0x48
FAST_FORWARD = 0x49
EJECT = 0x4A
FORWARD = 0x4B
BACKWARD = 0x4C
ANGLE = 0x50
SUBPICTURE = 0x51
F1 = 0x71
F2 = 0x72
F3 = 0x73
F4 = 0x74
F5 = 0x75
VENDOR_UNIQUE = 0x7E
state_flag: StateFlag
operation_id: OperationId
operation_data: bytes
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
return (
PassThroughFrame.StateFlag(operands[0] >> 7),
PassThroughFrame.OperationId(operands[0] & 0x7F),
operands[1 : 1 + operands[1]],
)
def make_operands(self):
return (
bytes([self.state_flag << 7 | self.operation_id, len(self.operation_data)])
+ self.operation_data
)
def __init__(
self,
state_flag: StateFlag,
operation_id: OperationId,
operation_data: bytes,
) -> None:
if len(operation_data) > 255:
raise ValueError("operation data must be <= 255 bytes")
self.state_flag = state_flag
self.operation_id = operation_id
self.operation_data = operation_data
# -----------------------------------------------------------------------------
@Frame.subclass
class PassThroughCommandFrame(PassThroughFrame, CommandFrame):
def __init__(
self,
ctype: CommandFrame.CommandType,
subunit_type: Frame.SubunitType,
subunit_id: int,
state_flag: PassThroughFrame.StateFlag,
operation_id: PassThroughFrame.OperationId,
operation_data: bytes,
) -> None:
PassThroughFrame.__init__(self, state_flag, operation_id, operation_data)
CommandFrame.__init__(
self,
ctype,
subunit_type,
subunit_id,
Frame.OperationCode.PASS_THROUGH,
self.make_operands(),
)
def __str__(self):
return (
f"PassThroughCommandFrame(ctype={self.ctype.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"state_flag={self.state_flag.name}, "
f"operation_id={self.operation_id.name}, "
f"operation_data={self.operation_data.hex()})"
)
# -----------------------------------------------------------------------------
@Frame.subclass
class PassThroughResponseFrame(PassThroughFrame, ResponseFrame):
def __init__(
self,
response: ResponseFrame.ResponseCode,
subunit_type: Frame.SubunitType,
subunit_id: int,
state_flag: PassThroughFrame.StateFlag,
operation_id: PassThroughFrame.OperationId,
operation_data: bytes,
) -> None:
PassThroughFrame.__init__(self, state_flag, operation_id, operation_data)
ResponseFrame.__init__(
self,
response,
subunit_type,
subunit_id,
Frame.OperationCode.PASS_THROUGH,
self.make_operands(),
)
def __str__(self):
return (
f"PassThroughResponseFrame(response={self.response.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"state_flag={self.state_flag.name}, "
f"operation_id={self.operation_id.name}, "
f"operation_data={self.operation_data.hex()})"
)

291
bumble/avctp.py Normal file
View File

@@ -0,0 +1,291 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import logging
import struct
from typing import Callable, cast, Dict, Optional
from bumble.colors import color
from bumble import avc
from bumble import l2cap
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
AVCTP_PSM = 0x0017
AVCTP_BROWSING_PSM = 0x001B
# -----------------------------------------------------------------------------
class MessageAssembler:
Callback = Callable[[int, bool, bool, int, bytes], None]
transaction_label: int
pid: int
c_r: int
ipid: int
payload: bytes
number_of_packets: int
packets_received: int
def __init__(self, callback: Callback) -> None:
self.callback = callback
self.reset()
def reset(self) -> None:
self.packets_received = 0
self.transaction_label = -1
self.pid = -1
self.c_r = -1
self.ipid = -1
self.payload = b''
self.number_of_packets = 0
self.packet_count = 0
def on_pdu(self, pdu: bytes) -> None:
self.packets_received += 1
transaction_label = pdu[0] >> 4
packet_type = Protocol.PacketType((pdu[0] >> 2) & 3)
c_r = (pdu[0] >> 1) & 1
ipid = pdu[0] & 1
if c_r == 0 and ipid != 0:
logger.warning("invalid IPID in command frame")
self.reset()
return
pid_offset = 1
if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.START):
if self.transaction_label >= 0:
# We are already in a transaction
logger.warning("received START or SINGLE fragment while in transaction")
self.reset()
self.packets_received = 1
if packet_type == Protocol.PacketType.START:
self.number_of_packets = pdu[1]
pid_offset = 2
pid = struct.unpack_from(">H", pdu, pid_offset)[0]
self.payload += pdu[pid_offset + 2 :]
if packet_type in (Protocol.PacketType.CONTINUE, Protocol.PacketType.END):
if transaction_label != self.transaction_label:
logger.warning("transaction label does not match")
self.reset()
return
if pid != self.pid:
logger.warning("PID does not match")
self.reset()
return
if c_r != self.c_r:
logger.warning("C/R does not match")
self.reset()
return
if self.packets_received > self.number_of_packets:
logger.warning("too many fragments in transaction")
self.reset()
return
if packet_type == Protocol.PacketType.END:
if self.packets_received != self.number_of_packets:
logger.warning("premature END")
self.reset()
return
else:
self.transaction_label = transaction_label
self.c_r = c_r
self.ipid = ipid
self.pid = pid
if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.END):
self.on_message_complete()
def on_message_complete(self):
try:
self.callback(
self.transaction_label,
self.c_r == 0,
self.ipid != 0,
self.pid,
self.payload,
)
except Exception as error:
logger.exception(color(f"!!! exception in callback: {error}", "red"))
self.reset()
# -----------------------------------------------------------------------------
class Protocol:
CommandHandler = Callable[[int, avc.CommandFrame], None]
command_handlers: Dict[int, CommandHandler] # Command handlers, by PID
ResponseHandler = Callable[[int, Optional[avc.ResponseFrame]], None]
response_handlers: Dict[int, ResponseHandler] # Response handlers, by PID
next_transaction_label: int
message_assembler: MessageAssembler
class PacketType(IntEnum):
SINGLE = 0b00
START = 0b01
CONTINUE = 0b10
END = 0b11
def __init__(self, l2cap_channel: l2cap.ClassicChannel) -> None:
self.command_handlers = {}
self.response_handlers = {}
self.l2cap_channel = l2cap_channel
self.message_assembler = MessageAssembler(self.on_message)
# Register to receive PDUs from the channel
l2cap_channel.sink = self.on_pdu
l2cap_channel.on("open", self.on_l2cap_channel_open)
l2cap_channel.on("close", self.on_l2cap_channel_close)
def on_l2cap_channel_open(self):
logger.debug(color("<<< AVCTP channel open", "magenta"))
def on_l2cap_channel_close(self):
logger.debug(color("<<< AVCTP channel closed", "magenta"))
def on_pdu(self, pdu: bytes) -> None:
self.message_assembler.on_pdu(pdu)
def on_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
) -> None:
logger.debug(
f"<<< AVCTP Message: pid={pid}, "
f"transaction_label={transaction_label}, "
f"is_command={is_command}, "
f"ipid={ipid}, "
f"payload={payload.hex()}"
)
# Check for invalid PID responses.
if ipid:
logger.debug(f"received IPID for PID={pid}")
# Find the appropriate handler.
if is_command:
if pid not in self.command_handlers:
logger.warning(f"no command handler for PID {pid}")
self.send_ipid(transaction_label, pid)
return
command_frame = cast(avc.CommandFrame, avc.Frame.from_bytes(payload))
self.command_handlers[pid](transaction_label, command_frame)
else:
if pid not in self.response_handlers:
logger.warning(f"no response handler for PID {pid}")
return
# By convention, for an ipid, send a None payload to the response handler.
if ipid:
response_frame = None
else:
response_frame = cast(avc.ResponseFrame, avc.Frame.from_bytes(payload))
self.response_handlers[pid](transaction_label, response_frame)
def send_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
):
# TODO: fragment large messages
packet_type = Protocol.PacketType.SINGLE
pdu = (
struct.pack(
">BH",
transaction_label << 4
| packet_type << 2
| (0 if is_command else 1) << 1
| (1 if ipid else 0),
pid,
)
+ payload
)
self.l2cap_channel.send_pdu(pdu)
def send_command(self, transaction_label: int, pid: int, payload: bytes) -> None:
logger.debug(
">>> AVCTP command: "
f"transaction_label={transaction_label}, "
f"pid={pid}, "
f"payload={payload.hex()}"
)
self.send_message(transaction_label, True, False, pid, payload)
def send_response(self, transaction_label: int, pid: int, payload: bytes):
logger.debug(
">>> AVCTP response: "
f"transaction_label={transaction_label}, "
f"pid={pid}, "
f"payload={payload.hex()}"
)
self.send_message(transaction_label, False, False, pid, payload)
def send_ipid(self, transaction_label: int, pid: int) -> None:
logger.debug(
">>> AVCTP ipid: " f"transaction_label={transaction_label}, " f"pid={pid}"
)
self.send_message(transaction_label, False, True, pid, b'')
def register_command_handler(
self, pid: int, handler: Protocol.CommandHandler
) -> None:
self.command_handlers[pid] = handler
def unregister_command_handler(
self, pid: int, handler: Protocol.CommandHandler
) -> None:
if pid not in self.command_handlers or self.command_handlers[pid] != handler:
raise ValueError("command handler not registered")
del self.command_handlers[pid]
def register_response_handler(
self, pid: int, handler: Protocol.ResponseHandler
) -> None:
self.response_handlers[pid] = handler
def unregister_response_handler(
self, pid: int, handler: Protocol.ResponseHandler
) -> None:
if pid not in self.response_handlers or self.response_handlers[pid] != handler:
raise ValueError("response handler not registered")
del self.response_handlers[pid]

View File

@@ -241,7 +241,10 @@ async def find_avdtp_service_with_sdp_client(
)
if profile_descriptor_list:
for profile_descriptor in profile_descriptor_list.value:
if len(profile_descriptor.value) >= 2:
if (
profile_descriptor.type == sdp.DataElement.SEQUENCE
and len(profile_descriptor.value) >= 2
):
avdtp_version_major = profile_descriptor.value[1].value >> 8
avdtp_version_minor = profile_descriptor.value[1].value & 0xFF
return (avdtp_version_major, avdtp_version_minor)
@@ -511,7 +514,8 @@ class MessageAssembler:
try:
self.callback(self.transaction_label, message)
except Exception as error:
logger.warning(color(f'!!! exception in callback: {error}'))
logger.exception(color(f'!!! exception in callback: {error}', 'red'))
self.reset()

1916
bumble/avrcp.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import logging
import asyncio
import dataclasses
import itertools
import random
import struct
@@ -42,6 +43,7 @@ from bumble.hci import (
HCI_LE_1M_PHY,
HCI_SUCCESS,
HCI_UNKNOWN_HCI_COMMAND_ERROR,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_VERSION_BLUETOOTH_CORE_5_0,
Address,
@@ -53,17 +55,21 @@ from bumble.hci import (
HCI_Connection_Request_Event,
HCI_Disconnection_Complete_Event,
HCI_Encryption_Change_Event,
HCI_Synchronous_Connection_Complete_Event,
HCI_LE_Advertising_Report_Event,
HCI_LE_CIS_Established_Event,
HCI_LE_CIS_Request_Event,
HCI_LE_Connection_Complete_Event,
HCI_LE_Read_Remote_Features_Complete_Event,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_Role_Change_Event,
)
from typing import Optional, Union, Dict, TYPE_CHECKING
from typing import Optional, Union, Dict, Any, TYPE_CHECKING
if TYPE_CHECKING:
from bumble.transport.common import TransportSink, TransportSource
from bumble.link import LocalLink
from bumble.transport.common import TransportSink
# -----------------------------------------------------------------------------
# Logging
@@ -79,15 +85,27 @@ class DataObject:
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
handle: int
cis_id: int
cig_id: int
acl_connection: Optional[Connection] = None
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Connection:
def __init__(self, controller, handle, role, peer_address, link, transport):
self.controller = controller
self.handle = handle
self.role = role
self.peer_address = peer_address
self.link = link
controller: Controller
handle: int
role: int
peer_address: Address
link: Any
transport: int
link_type: int
def __post_init__(self):
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
def on_hci_acl_data_packet(self, packet):
self.assembler.feed_packet(packet)
@@ -106,10 +124,10 @@ class Connection:
class Controller:
def __init__(
self,
name,
name: str,
host_source=None,
host_sink: Optional[TransportSink] = None,
link=None,
link: Optional[LocalLink] = None,
public_address: Optional[Union[bytes, str, Address]] = None,
):
self.name = name
@@ -125,6 +143,8 @@ class Controller:
self.classic_connections: Dict[
Address, Connection
] = {} # Connections in BR/EDR
self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.hci_revision = 0
@@ -134,12 +154,14 @@ class Controller:
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
self.manufacturer_name = 0xFFFF
self.hc_data_packet_length = 27
self.hc_total_num_data_packets = 64
self.hc_le_data_packet_length = 27
self.hc_total_num_le_data_packets = 64
self.event_mask = 0
self.event_mask_page_2 = 0
self.supported_commands = bytes.fromhex(
'2000800000c000000000e40000002822000000000000040000f7ffff7f000000'
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004000000000000000000000000000000000000000000000000'
)
self.le_event_mask = 0
@@ -301,7 +323,7 @@ class Controller:
############################################################
# Link connections
############################################################
def allocate_connection_handle(self):
def allocate_connection_handle(self) -> int:
handle = 0
max_handle = 0
for connection in itertools.chain(
@@ -313,6 +335,13 @@ class Controller:
if connection.handle == handle:
# Already used, continue searching after the current max
handle = max_handle + 1
for cis_handle in itertools.chain(
self.central_cis_links.keys(), self.peripheral_cis_links.keys()
):
max_handle = max(max_handle, cis_handle)
if cis_handle == handle:
# Already used, continue searching after the current max
handle = max_handle + 1
return handle
def find_le_connection_by_address(self, address):
@@ -357,12 +386,13 @@ class Controller:
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
self,
connection_handle,
BT_PERIPHERAL_ROLE,
peer_address,
self.link,
BT_LE_TRANSPORT,
controller=self,
handle=connection_handle,
role=BT_PERIPHERAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
self.peripheral_connections[peer_address] = connection
logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
@@ -416,12 +446,13 @@ class Controller:
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
self,
connection_handle,
BT_CENTRAL_ROLE,
peer_address,
self.link,
BT_LE_TRANSPORT,
controller=self,
handle=connection_handle,
role=BT_CENTRAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
self.central_connections[peer_address] = connection
logger.debug(
@@ -538,6 +569,104 @@ class Controller:
)
self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
def on_link_cis_request(
self, central_address: Address, cig_id: int, cis_id: int
) -> None:
'''
Called when an incoming CIS request occurs from a central on the link
'''
connection = self.peripheral_connections.get(central_address)
assert connection
pending_cis_link = CisLink(
handle=self.allocate_connection_handle(),
cis_id=cis_id,
cig_id=cig_id,
acl_connection=connection,
)
self.peripheral_cis_links[pending_cis_link.handle] = pending_cis_link
self.send_hci_packet(
HCI_LE_CIS_Request_Event(
acl_connection_handle=connection.handle,
cis_connection_handle=pending_cis_link.handle,
cig_id=cig_id,
cis_id=cis_id,
)
)
def on_link_cis_established(self, cig_id: int, cis_id: int) -> None:
'''
Called when an incoming CIS established.
'''
cis_link = next(
cis_link
for cis_link in itertools.chain(
self.central_cis_links.values(), self.peripheral_cis_links.values()
)
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
)
self.send_hci_packet(
HCI_LE_CIS_Established_Event(
status=HCI_SUCCESS,
connection_handle=cis_link.handle,
# CIS parameters are ignored.
cig_sync_delay=0,
cis_sync_delay=0,
transport_latency_c_to_p=0,
transport_latency_p_to_c=0,
phy_c_to_p=0,
phy_p_to_c=0,
nse=0,
bn_c_to_p=0,
bn_p_to_c=0,
ft_c_to_p=0,
ft_p_to_c=0,
max_pdu_c_to_p=0,
max_pdu_p_to_c=0,
iso_interval=0,
)
)
def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None:
'''
Called when a CIS disconnected.
'''
if cis_link := next(
(
cis_link
for cis_link in self.peripheral_cis_links.values()
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
),
None,
):
# Remove peripheral CIS on disconnection.
self.peripheral_cis_links.pop(cis_link.handle)
elif cis_link := next(
(
cis_link
for cis_link in self.central_cis_links.values()
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
),
None,
):
# Keep central CIS on disconnection. They should be removed by HCI_LE_Remove_CIG_Command.
cis_link.acl_connection = None
else:
return
self.send_hci_packet(
HCI_Disconnection_Complete_Event(
status=HCI_SUCCESS,
connection_handle=cis_link.handle,
reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
)
)
############################################################
# Classic link connections
############################################################
@@ -566,6 +695,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
self.classic_connections[peer_address] = connection
logger.debug(
@@ -619,6 +749,42 @@ class Controller:
)
)
def on_classic_sco_connection_complete(
self, peer_address: Address, status: int, link_type: int
):
if status == HCI_SUCCESS:
# Allocate (or reuse) a connection handle
connection_handle = self.allocate_connection_handle()
connection = Connection(
controller=self,
handle=connection_handle,
# Role doesn't matter in SCO.
role=BT_CENTRAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
link_type=link_type,
)
self.classic_connections[peer_address] = connection
logger.debug(f'New SCO connection handle: 0x{connection_handle:04X}')
else:
connection_handle = 0
self.send_hci_packet(
HCI_Synchronous_Connection_Complete_Event(
status=status,
connection_handle=connection_handle,
bd_addr=peer_address,
link_type=link_type,
# TODO: Provide SCO connection parameters.
transmission_interval=0,
retransmission_window=0,
rx_packet_length=0,
tx_packet_length=0,
air_mode=0,
)
)
############################################################
# Advertising support
############################################################
@@ -721,6 +887,17 @@ class Controller:
else:
# Remove the connection
del self.classic_connections[connection.peer_address]
elif cis_link := (
self.central_cis_links.get(handle) or self.peripheral_cis_links.get(handle)
):
if self.link:
self.link.disconnect_cis(
initiator_controller=self,
peer_address=cis_link.acl_connection.peer_address,
cig_id=cis_link.cig_id,
cis_id=cis_link.cis_id,
)
# Spec requires handle to be kept after disconnection.
def on_hci_accept_connection_request_command(self, command):
'''
@@ -738,6 +915,68 @@ class Controller:
)
self.link.classic_accept_connection(self, command.bd_addr, command.role)
def on_hci_enhanced_setup_synchronous_connection_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.1.45 Enhanced Setup Synchronous Connection command
'''
if self.link is None:
return
if not (
connection := self.find_classic_connection_by_handle(
command.connection_handle
)
):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.link.classic_sco_connect(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
)
def on_hci_enhanced_accept_synchronous_connection_request_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.1.46 Enhanced Accept Synchronous Connection Request command
'''
if self.link is None:
return
if not (connection := self.find_classic_connection_by_address(command.bd_addr)):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.link.classic_accept_sco_connection(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
)
def on_hci_switch_role_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
@@ -914,6 +1153,19 @@ class Controller:
'''
return bytes([HCI_SUCCESS]) + self.lmp_features
def on_hci_read_buffer_size_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.4.5 Read Buffer Size Command
'''
return struct.pack(
'<BHBHH',
HCI_SUCCESS,
self.hc_data_packet_length,
0,
self.hc_total_num_data_packets,
0,
)
def on_hci_read_bd_addr_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
@@ -1089,6 +1341,18 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command
'''
handle = command.connection_handle
if not self.find_connection_by_handle(handle):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
# First, say that the command is pending
self.send_hci_packet(
HCI_Command_Status_Event(
@@ -1102,7 +1366,7 @@ class Controller:
self.send_hci_packet(
HCI_LE_Read_Remote_Features_Complete_Event(
status=HCI_SUCCESS,
connection_handle=0,
connection_handle=handle,
le_features=bytes.fromhex('dd40000000000000'),
)
)
@@ -1264,6 +1528,107 @@ class Controller:
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
def on_hci_le_set_cig_parameters_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command
'''
# Remove old CIG implicitly.
for handle, cis_link in self.central_cis_links.items():
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(handle)
handles = []
for cis_id in command.cis_id:
handle = self.allocate_connection_handle()
handles.append(handle)
self.central_cis_links[handle] = CisLink(
cis_id=cis_id,
cig_id=command.cig_id,
handle=handle,
)
return struct.pack(
'<BBB', HCI_SUCCESS, command.cig_id, len(handles)
) + b''.join([struct.pack('<H', handle) for handle in handles])
def on_hci_le_create_cis_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command
'''
if not self.link:
return
for cis_handle, acl_handle in zip(
command.cis_connection_handle, command.acl_connection_handle
):
if not (connection := self.find_connection_by_handle(acl_handle)):
logger.error(f'Cannot find connection with handle={acl_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
if not (cis_link := self.central_cis_links.get(cis_handle)):
logger.error(f'Cannot find CIS with handle={cis_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
cis_link.acl_connection = connection
self.link.create_cis(
self,
peripheral_address=connection.peer_address,
cig_id=cis_link.cig_id,
cis_id=cis_link.cis_id,
)
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
def on_hci_le_remove_cig_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command
'''
status = HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR
for cis_handle, cis_link in self.central_cis_links.items():
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(cis_handle)
status = HCI_SUCCESS
return struct.pack('<BH', status, command.cig_id)
def on_hci_le_accept_cis_request_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command
'''
if not self.link:
return
if not (
pending_cis_link := self.peripheral_cis_links.get(command.connection_handle)
):
logger.error(f'Cannot find CIS with handle={command.connection_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
assert pending_cis_link.acl_connection
self.link.accept_cis(
peripheral_controller=self,
central_address=pending_cis_link.acl_connection.peer_address,
cig_id=pending_cis_link.cig_id,
cis_id=pending_cis_link.cis_id,
)
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
def on_hci_le_setup_iso_data_path_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command

View File

@@ -97,12 +97,16 @@ class BaseError(Exception):
namespace = f'{self.error_namespace}/'
else:
namespace = ''
error_text = {
(True, True): f'{self.error_name} [0x{self.error_code:X}]',
(True, False): self.error_name,
(False, True): f'0x{self.error_code:X}',
(False, False): '',
}[(self.error_name != '', self.error_code is not None)]
have_name = self.error_name != ''
have_code = self.error_code is not None
if have_name and have_code:
error_text = f'{self.error_name} [0x{self.error_code:X}]'
elif have_name and not have_code:
error_text = self.error_name
elif not have_name and have_code:
error_text = f'0x{self.error_code:X}'
else:
error_text = '<unspecified>'
return f'{type(self).__name__}({namespace}{error_text})'
@@ -319,7 +323,7 @@ BT_HIDP_PROTOCOL_ID = UUID.from_16_bits(0x0011, 'HIDP')
BT_HARDCOPY_CONTROL_CHANNEL_PROTOCOL_ID = UUID.from_16_bits(0x0012, 'HardcopyControlChannel')
BT_HARDCOPY_DATA_CHANNEL_PROTOCOL_ID = UUID.from_16_bits(0x0014, 'HardcopyDataChannel')
BT_HARDCOPY_NOTIFICATION_PROTOCOL_ID = UUID.from_16_bits(0x0016, 'HardcopyNotification')
BT_AVTCP_PROTOCOL_ID = UUID.from_16_bits(0x0017, 'AVCTP')
BT_AVCTP_PROTOCOL_ID = UUID.from_16_bits(0x0017, 'AVCTP')
BT_AVDTP_PROTOCOL_ID = UUID.from_16_bits(0x0019, 'AVDTP')
BT_CMTP_PROTOCOL_ID = UUID.from_16_bits(0x001B, 'CMTP')
BT_MCAP_CONTROL_CHANNEL_PROTOCOL_ID = UUID.from_16_bits(0x001E, 'MCAPControlChannel')
@@ -821,8 +825,8 @@ class AdvertisingData:
ad_structures = []
self.ad_structures = ad_structures[:]
@staticmethod
def from_bytes(data):
@classmethod
def from_bytes(cls, data: bytes) -> AdvertisingData:
instance = AdvertisingData()
instance.append(data)
return instance
@@ -978,7 +982,7 @@ class AdvertisingData:
return ad_data
def append(self, data):
def append(self, data: bytes) -> None:
offset = 0
while offset + 1 < len(data):
length = data[offset]

File diff suppressed because it is too large Load Diff

View File

@@ -23,16 +23,28 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import enum
import functools
import logging
import struct
from typing import Optional, Sequence, Iterable, List, Union
from typing import (
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
Union,
TYPE_CHECKING,
)
from .colors import color
from .core import UUID, get_dict_key_by_value
from .att import Attribute
from bumble.colors import color
from bumble.core import UUID
from bumble.att import Attribute, AttributeValue
if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy
from bumble.device import Connection
# -----------------------------------------------------------------------------
@@ -522,56 +534,43 @@ class CharacteristicDeclaration(Attribute):
# -----------------------------------------------------------------------------
class CharacteristicValue:
'''
Characteristic value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
'''
def __init__(self, read=None, write=None):
self._read = read
self._write = write
def read(self, connection):
return self._read(connection) if self._read else b''
def write(self, connection, value):
if self._write:
self._write(connection, value)
class CharacteristicValue(AttributeValue):
"""Same as AttributeValue, for backward compatibility"""
# -----------------------------------------------------------------------------
class CharacteristicAdapter:
'''
An adapter that can adapt any object with `read_value` and `write_value`
methods (like Characteristic and CharacteristicProxy objects) by wrapping
those methods with ones that return/accept encoded/decoded values.
Objects with async methods are considered proxies, so the adaptation is one
where the return value of `read_value` is decoded and the value passed to
`write_value` is encoded. Other objects are considered local characteristics
so the adaptation is one where the return value of `read_value` is encoded
and the value passed to `write_value` is decoded.
If the characteristic has a `subscribe` method, it is wrapped with one where
the values are decoded before being passed to the subscriber.
An adapter that can adapt Characteristic and AttributeProxy objects
by wrapping their `read_value()` and `write_value()` methods with ones that
return/accept encoded/decoded values.
For proxies (i.e used by a GATT client), the adaptation is one where the return
value of `read_value()` is decoded and the value passed to `write_value()` is
encoded. The `subscribe()` method, is wrapped with one where the values are decoded
before being passed to the subscriber.
For local values (i.e hosted by a GATT server) the adaptation is one where the
return value of `read_value()` is encoded and the value passed to `write_value()`
is decoded.
'''
def __init__(self, characteristic):
self.wrapped_characteristic = characteristic
self.subscribers = {} # Map from subscriber to proxy subscriber
read_value: Callable
write_value: Callable
if asyncio.iscoroutinefunction(
characteristic.read_value
) and asyncio.iscoroutinefunction(characteristic.write_value):
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
else:
def __init__(self, characteristic: Union[Characteristic, AttributeProxy]):
self.wrapped_characteristic = characteristic
self.subscribers: Dict[
Callable, Callable
] = {} # Map from subscriber to proxy subscriber
if isinstance(characteristic, Characteristic):
self.read_value = self.read_encoded_value
self.write_value = self.write_encoded_value
if hasattr(self.wrapped_characteristic, 'subscribe'):
else:
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
self.subscribe = self.wrapped_subscribe
if hasattr(self.wrapped_characteristic, 'unsubscribe'):
self.unsubscribe = self.wrapped_unsubscribe
def __getattr__(self, name):
@@ -590,11 +589,13 @@ class CharacteristicAdapter:
else:
setattr(self.wrapped_characteristic, name, value)
def read_encoded_value(self, connection):
return self.encode_value(self.wrapped_characteristic.read_value(connection))
async def read_encoded_value(self, connection):
return self.encode_value(
await self.wrapped_characteristic.read_value(connection)
)
def write_encoded_value(self, connection, value):
return self.wrapped_characteristic.write_value(
async def write_encoded_value(self, connection, value):
return await self.wrapped_characteristic.write_value(
connection, self.decode_value(value)
)
@@ -729,13 +730,24 @@ class Descriptor(Attribute):
'''
def __str__(self) -> str:
if isinstance(self.value, bytes):
value_str = self.value.hex()
elif isinstance(self.value, CharacteristicValue):
value = self.value.read(None)
if isinstance(value, bytes):
value_str = value.hex()
else:
value_str = '<async>'
else:
value_str = '<...>'
return (
f'Descriptor(handle=0x{self.handle:04X}, '
f'type={self.type}, '
f'value={self.read_value(None).hex()})'
f'value={value_str})'
)
# -----------------------------------------------------------------------------
class ClientCharacteristicConfigurationBits(enum.IntFlag):
'''
See Vol 3, Part G - 3.3.3.3 - Table 3.11 Client Characteristic Configuration bit

View File

@@ -1068,7 +1068,7 @@ class Client:
logger.warning('!!! unexpected response, there is no pending request')
return
# Sanity check: the response should match the pending request unless it is
# The response should match the pending request unless it is
# an error response
if att_pdu.op_code != ATT_ERROR_RESPONSE:
expected_response_name = self.pending_request.name.replace(

View File

@@ -31,9 +31,9 @@ import struct
from typing import List, Tuple, Optional, TypeVar, Type, Dict, Iterable, TYPE_CHECKING
from pyee import EventEmitter
from .colors import color
from .core import UUID
from .att import (
from bumble.colors import color
from bumble.core import UUID
from bumble.att import (
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_ATTRIBUTE_NOT_LONG_ERROR,
ATT_CID,
@@ -60,7 +60,7 @@ from .att import (
ATT_Write_Response,
Attribute,
)
from .gatt import (
from bumble.gatt import (
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_MAX_ATTRIBUTE_VALUE_SIZE,
@@ -74,6 +74,7 @@ from .gatt import (
Descriptor,
Service,
)
from bumble.utils import AsyncRunner
if TYPE_CHECKING:
from bumble.device import Device, Connection
@@ -327,7 +328,7 @@ class Server(EventEmitter):
f'handle=0x{characteristic.handle:04X}: {value.hex()}'
)
# Sanity check
# Check parameters
if len(value) != 2:
logger.warning('CCCD value not 2 bytes long')
return
@@ -379,7 +380,7 @@ class Server(EventEmitter):
# Get or encode the value
value = (
attribute.read_value(connection)
await attribute.read_value(connection)
if value is None
else attribute.encode_value(value)
)
@@ -422,7 +423,7 @@ class Server(EventEmitter):
# Get or encode the value
value = (
attribute.read_value(connection)
await attribute.read_value(connection)
if value is None
else attribute.encode_value(value)
)
@@ -650,7 +651,8 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_find_by_type_value_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_find_by_type_value_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.3.3 Find By Type Value Request
'''
@@ -658,13 +660,13 @@ class Server(EventEmitter):
# Build list of returned attributes
pdu_space_available = connection.att_mtu - 2
attributes = []
for attribute in (
async for attribute in (
attribute
for attribute in self.attributes
if attribute.handle >= request.starting_handle
and attribute.handle <= request.ending_handle
and attribute.type == request.attribute_type
and attribute.read_value(connection) == request.attribute_value
and (await attribute.read_value(connection)) == request.attribute_value
and pdu_space_available >= 4
):
# TODO: check permissions
@@ -702,7 +704,8 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_read_by_type_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_by_type_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request
'''
@@ -725,7 +728,7 @@ class Server(EventEmitter):
and pdu_space_available
):
try:
attribute_value = attribute.read_value(connection)
attribute_value = await attribute.read_value(connection)
except ATT_Error as error:
# If the first attribute is unreadable, return an error
# Otherwise return attributes up to this point
@@ -767,14 +770,15 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_read_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.3 Read Request
'''
if attribute := self.get_attribute(request.attribute_handle):
try:
value = attribute.read_value(connection)
value = await attribute.read_value(connection)
except ATT_Error as error:
response = ATT_Error_Response(
request_opcode_in_error=request.op_code,
@@ -792,14 +796,15 @@ class Server(EventEmitter):
)
self.send_response(connection, response)
def on_att_read_blob_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_blob_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.5 Read Blob Request
'''
if attribute := self.get_attribute(request.attribute_handle):
try:
value = attribute.read_value(connection)
value = await attribute.read_value(connection)
except ATT_Error as error:
response = ATT_Error_Response(
request_opcode_in_error=request.op_code,
@@ -836,7 +841,8 @@ class Server(EventEmitter):
)
self.send_response(connection, response)
def on_att_read_by_group_type_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_by_group_type_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.9 Read by Group Type Request
'''
@@ -864,7 +870,7 @@ class Server(EventEmitter):
):
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = attribute.read_value(connection)
attribute_value = await attribute.read_value(connection)
# Check the attribute value size
max_attribute_size = min(connection.att_mtu - 6, 251)
if len(attribute_value) > max_attribute_size:
@@ -903,7 +909,8 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_write_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_write_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.5.1 Write Request
'''
@@ -936,12 +943,13 @@ class Server(EventEmitter):
return
# Accept the value
attribute.write_value(connection, request.attribute_value)
await attribute.write_value(connection, request.attribute_value)
# Done
self.send_response(connection, ATT_Write_Response())
def on_att_write_command(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_write_command(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.5.3 Write Command
'''
@@ -959,7 +967,7 @@ class Server(EventEmitter):
# Accept the value
try:
attribute.write_value(connection, request.attribute_value)
await attribute.write_value(connection, request.attribute_value)
except Exception as error:
logger.exception(f'!!! ignoring exception: {error}')

View File

@@ -23,7 +23,7 @@ import functools
import logging
import secrets
import struct
from typing import Any, Dict, Callable, Optional, Type, Union, List
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union
from bumble import crypto
from .colors import color
@@ -223,41 +223,47 @@ HCI_VENDOR_EVENT = 0xFF
# HCI Subevent Codes
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
HCI_LE_ADVERTISING_REPORT_EVENT = 0x02
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06
HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B
HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10
HCI_LE_SCAN_TIMEOUT_EVENT = 0x11
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15
HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16
HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18
HCI_LE_CIS_ESTABLISHED_EVENT = 0X19
HCI_LE_CIS_REQUEST_EVENT = 0X1A
HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D
HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F
HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
HCI_LE_ADVERTISING_REPORT_EVENT = 0x02
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06
HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B
HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10
HCI_LE_SCAN_TIMEOUT_EVENT = 0x11
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15
HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16
HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18
HCI_LE_CIS_ESTABLISHED_EVENT = 0X19
HCI_LE_CIS_REQUEST_EVENT = 0X1A
HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D
HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F
HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT = 0X24
HCI_LE_PERIODIC_ADVERTISING_REPORT_V2_EVENT = 0X25
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26
HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27
HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29
# HCI Command
@@ -650,47 +656,6 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
# Command Status codes
HCI_COMMAND_STATUS_PENDING = 0
# LE Event Masks
HCI_LE_CONNECTION_COMPLETE_EVENT_MASK = (1 << 0)
HCI_LE_ADVERTISING_REPORT_EVENT_MASK = (1 << 1)
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT_MASK = (1 << 2)
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT_MASK = (1 << 3)
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT_MASK = (1 << 4)
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT_MASK = (1 << 5)
HCI_LE_DATA_LENGTH_CHANGE_EVENT_MASK = (1 << 6)
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT_MASK = (1 << 7)
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT_MASK = (1 << 8)
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT_MASK = (1 << 9)
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT_MASK = (1 << 10)
HCI_LE_PHY_UPDATE_COMPLETE_EVENT_MASK = (1 << 11)
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT_MASK = (1 << 12)
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT_MASK = (1 << 13)
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT_MASK = (1 << 14)
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT_MASK = (1 << 15)
HCI_LE_EXTENDED_SCAN_TIMEOUT_EVENT_MASK = (1 << 16)
HCI_LE_EXTENDED_ADVERTISING_SET_TERMINATED_EVENT_MASK = (1 << 17)
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT_MASK = (1 << 18)
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT_MASK = (1 << 19)
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT_MASK = (1 << 20)
HCI_LE_CONNECTION_IQ_REPORT_EVENT_MASK = (1 << 21)
HCI_LE_CTE_REQUEST_FAILED_EVENT_MASK = (1 << 22)
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT_MASK = (1 << 23)
HCI_LE_CIS_ESTABLISHED_EVENT_MASK = (1 << 24)
HCI_LE_CIS_REQUEST_EVENT_MASK = (1 << 25)
HCI_LE_CREATE_BIG_COMPLETE_EVENT_MASK = (1 << 26)
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT_MASK = (1 << 27)
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT_MASK = (1 << 28)
HCI_LE_BIG_SYNC_LOST_EVENT_MASK = (1 << 29)
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT_MASK = (1 << 30)
HCI_LE_PATH_LOSS_THRESHOLD_EVENT_MASK = (1 << 31)
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT_MASK = (1 << 32)
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT_MASK = (1 << 33)
HCI_LE_SUBRATE_CHANGE_EVENT_MASK = (1 << 34)
HCI_LE_EVENT_MASK_NAMES = {
mask: mask_name for (mask_name, mask) in globals().items()
if mask_name.startswith('HCI_LE_') and mask_name.endswith('_EVENT_MASK')
}
# ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
@@ -732,15 +697,15 @@ HCI_LE_PHY_TYPE_TO_BIT = {
class Phy(enum.IntEnum):
LE_1M = 0x01
LE_2M = 0x02
LE_CODED = 0x03
LE_1M = HCI_LE_1M_PHY
LE_2M = HCI_LE_2M_PHY
LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag):
LE_1M = 0b00000001
LE_2M = 0b00000010
LE_CODED = 0b00000100
LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 1 << HCI_LE_2M_PHY_BIT
LE_CODED = 1 << HCI_LE_CODED_PHY_BIT
# Connection Parameters
@@ -1360,55 +1325,97 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
# LE Supported Features
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44
class LeFeature(enum.IntEnum):
LE_ENCRYPTION = 0
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1
EXTENDED_REJECT_INDICATION = 2
PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 3
LE_PING = 4
LE_DATA_PACKET_LENGTH_EXTENSION = 5
LL_PRIVACY = 6
EXTENDED_SCANNER_FILTER_POLICIES = 7
LE_2M_PHY = 8
STABLE_MODULATION_INDEX_TRANSMITTER = 9
STABLE_MODULATION_INDEX_RECEIVER = 10
LE_CODED_PHY = 11
LE_EXTENDED_ADVERTISING = 12
LE_PERIODIC_ADVERTISING = 13
CHANNEL_SELECTION_ALGORITHM_2 = 14
LE_POWER_CLASS_1 = 15
MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 16
CONNECTION_CTE_REQUEST = 17
CONNECTION_CTE_RESPONSE = 18
CONNECTIONLESS_CTE_TRANSMITTER = 19
CONNECTIONLESS_CTR_RECEIVER = 20
ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 21
ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 22
RECEIVING_CONSTANT_TONE_EXTENSIONS = 23
PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 24
PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 25
SLEEP_CLOCK_ACCURACY_UPDATES = 26
REMOTE_PUBLIC_KEY_VALIDATION = 27
CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 28
CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 29
ISOCHRONOUS_BROADCASTER = 30
SYNCHRONIZED_RECEIVER = 31
CONNECTED_ISOCHRONOUS_STREAM = 32
LE_POWER_CONTROL_REQUEST = 33
LE_POWER_CONTROL_REQUEST_DUP = 34
LE_PATH_LOSS_MONITORING = 35
PERIODIC_ADVERTISING_ADI_SUPPORT = 36
CONNECTION_SUBRATING = 37
CONNECTION_SUBRATING_HOST_SUPPORT = 38
CHANNEL_CLASSIFICATION = 39
ADVERTISING_CODING_SELECTION = 40
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 41
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 43
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 44
HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items()
if feature_name.startswith('HCI_') and feature_name.endswith('_LE_SUPPORTED_FEATURE')
}
class LeFeatureMask(enum.IntFlag):
LE_ENCRYPTION = 1 << LeFeature.LE_ENCRYPTION
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1 << LeFeature.CONNECTION_PARAMETERS_REQUEST_PROCEDURE
EXTENDED_REJECT_INDICATION = 1 << LeFeature.EXTENDED_REJECT_INDICATION
PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 1 << LeFeature.PERIPHERAL_INITIATED_FEATURE_EXCHANGE
LE_PING = 1 << LeFeature.LE_PING
LE_DATA_PACKET_LENGTH_EXTENSION = 1 << LeFeature.LE_DATA_PACKET_LENGTH_EXTENSION
LL_PRIVACY = 1 << LeFeature.LL_PRIVACY
EXTENDED_SCANNER_FILTER_POLICIES = 1 << LeFeature.EXTENDED_SCANNER_FILTER_POLICIES
LE_2M_PHY = 1 << LeFeature.LE_2M_PHY
STABLE_MODULATION_INDEX_TRANSMITTER = 1 << LeFeature.STABLE_MODULATION_INDEX_TRANSMITTER
STABLE_MODULATION_INDEX_RECEIVER = 1 << LeFeature.STABLE_MODULATION_INDEX_RECEIVER
LE_CODED_PHY = 1 << LeFeature.LE_CODED_PHY
LE_EXTENDED_ADVERTISING = 1 << LeFeature.LE_EXTENDED_ADVERTISING
LE_PERIODIC_ADVERTISING = 1 << LeFeature.LE_PERIODIC_ADVERTISING
CHANNEL_SELECTION_ALGORITHM_2 = 1 << LeFeature.CHANNEL_SELECTION_ALGORITHM_2
LE_POWER_CLASS_1 = 1 << LeFeature.LE_POWER_CLASS_1
MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 1 << LeFeature.MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE
CONNECTION_CTE_REQUEST = 1 << LeFeature.CONNECTION_CTE_REQUEST
CONNECTION_CTE_RESPONSE = 1 << LeFeature.CONNECTION_CTE_RESPONSE
CONNECTIONLESS_CTE_TRANSMITTER = 1 << LeFeature.CONNECTIONLESS_CTE_TRANSMITTER
CONNECTIONLESS_CTR_RECEIVER = 1 << LeFeature.CONNECTIONLESS_CTR_RECEIVER
ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 1 << LeFeature.ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION
ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 1 << LeFeature.ANTENNA_SWITCHING_DURING_CTE_RECEPTION
RECEIVING_CONSTANT_TONE_EXTENSIONS = 1 << LeFeature.RECEIVING_CONSTANT_TONE_EXTENSIONS
PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 1 << LeFeature.PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER
PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 1 << LeFeature.PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT
SLEEP_CLOCK_ACCURACY_UPDATES = 1 << LeFeature.SLEEP_CLOCK_ACCURACY_UPDATES
REMOTE_PUBLIC_KEY_VALIDATION = 1 << LeFeature.REMOTE_PUBLIC_KEY_VALIDATION
CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 1 << LeFeature.CONNECTED_ISOCHRONOUS_STREAM_CENTRAL
CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 1 << LeFeature.CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL
ISOCHRONOUS_BROADCASTER = 1 << LeFeature.ISOCHRONOUS_BROADCASTER
SYNCHRONIZED_RECEIVER = 1 << LeFeature.SYNCHRONIZED_RECEIVER
CONNECTED_ISOCHRONOUS_STREAM = 1 << LeFeature.CONNECTED_ISOCHRONOUS_STREAM
LE_POWER_CONTROL_REQUEST = 1 << LeFeature.LE_POWER_CONTROL_REQUEST
LE_POWER_CONTROL_REQUEST_DUP = 1 << LeFeature.LE_POWER_CONTROL_REQUEST_DUP
LE_PATH_LOSS_MONITORING = 1 << LeFeature.LE_PATH_LOSS_MONITORING
PERIODIC_ADVERTISING_ADI_SUPPORT = 1 << LeFeature.PERIODIC_ADVERTISING_ADI_SUPPORT
CONNECTION_SUBRATING = 1 << LeFeature.CONNECTION_SUBRATING
CONNECTION_SUBRATING_HOST_SUPPORT = 1 << LeFeature.CONNECTION_SUBRATING_HOST_SUPPORT
CHANNEL_CLASSIFICATION = 1 << LeFeature.CHANNEL_CLASSIFICATION
ADVERTISING_CODING_SELECTION = 1 << LeFeature.ADVERTISING_CODING_SELECTION
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << LeFeature.ADVERTISING_CODING_SELECTION_HOST_SUPPORT
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << LeFeature.PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << LeFeature.PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER
# fmt: on
@@ -2026,6 +2033,17 @@ class OwnAddressType(enum.IntEnum):
return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
# -----------------------------------------------------------------------------
class LoopbackMode(enum.IntEnum):
DISABLED = 0
LOCAL = 1
REMOTE = 2
@classmethod
def type_spec(cls):
return {'size': 1, 'mapper': lambda x: LoopbackMode(x).name}
# -----------------------------------------------------------------------------
class HCI_Packet:
'''
@@ -2857,6 +2875,20 @@ class HCI_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.3.1 Set Event Mask Command
'''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# -----------------------------------------------------------------------------
@HCI_Command.command()
@@ -3352,6 +3384,27 @@ class HCI_Read_Encryption_Key_Size_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('loopback_mode', LoopbackMode.type_spec()),
],
)
class HCI_Read_Loopback_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.6.1 Read Loopback Mode Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('loopback_mode', 1)])
class HCI_Write_Loopback_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.6.2 Write Loopback Mode Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('le_event_mask', 8)])
class HCI_LE_Set_Event_Mask_Command(HCI_Command):
@@ -3359,6 +3412,20 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.8.1 LE Set Event Mask Command
'''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -3966,13 +4033,16 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
('advertising_sid', 1),
('scan_request_notification_enable', 1),
],
return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx__power', 1)],
return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx_power', 1)],
)
class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''
TX_POWER_NO_PREFERENCE = 0x7F
SHOULD_NOT_FRAGMENT = 0x01
class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
@@ -4217,7 +4287,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
('scanning_filter_policy:', self.scanning_filter_policy),
('scanning_phys: ', ','.join(scanning_phys_strs)),
]
for (i, scanning_phy_str) in enumerate(scanning_phys_strs):
for i, scanning_phy_str in enumerate(scanning_phys_strs):
fields.append(
(
f'{scanning_phy_str}.scan_type: ',
@@ -4360,7 +4430,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('peer_address: ', str(self.peer_address)),
('initiating_phys: ', ','.join(initiating_phys_strs)),
]
for (i, initiating_phys_str) in enumerate(initiating_phys_strs):
for i, initiating_phys_str in enumerate(initiating_phys_strs):
fields.append(
(
f'{initiating_phys_str}.scan_interval: ',
@@ -4733,7 +4803,11 @@ class HCI_Event(HCI_Packet):
HCI_Object.init_from_bytes(self, parameters, 0, fields)
return self
def __init__(self, event_code, parameters=None, **kwargs):
def __init__(self, event_code=-1, parameters=None, **kwargs):
# Since the legacy implementation relies on an __init__ injector, typing always
# complains that positional argument event_code is not passed, so here sets a
# default value to allow building derived HCI_Event without event_code.
assert event_code != -1
super().__init__(HCI_Event.event_name(event_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
HCI_Object.init_from_fields(self, fields, kwargs)
@@ -4827,7 +4901,8 @@ class HCI_Extended_Event(HCI_Event):
HCI_Object.init_from_bytes(self, parameters, 1, fields)
return self
def __init__(self, subevent_code, parameters, **kwargs):
def __init__(self, subevent_code=None, parameters=None, **kwargs):
assert subevent_code is not None
self.subevent_code = subevent_code
if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs:
parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes(
@@ -5242,7 +5317,7 @@ HCI_LE_Meta_Event.subevent_classes[
('status', 1),
('advertising_handle', 1),
('connection_handle', 2),
('number_completed_extended_advertising_events', 1),
('num_completed_extended_advertising_events', 1),
]
)
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
@@ -6183,7 +6258,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag:
if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
logger.warning(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4
@@ -6296,7 +6371,7 @@ class HCI_AclDataPacketAssembler:
self.current_data = None
self.l2cap_pdu_length = 0
else:
# Sanity check
# Compliance check
if len(self.current_data) > self.l2cap_pdu_length + 4:
logger.warning('!!! ACL data exceeds L2CAP PDU')
self.current_data = None

View File

@@ -18,10 +18,16 @@
from __future__ import annotations
from collections.abc import Callable, MutableMapping
from typing import cast, Any
from typing import cast, Any, Optional
import logging
from bumble import avc
from bumble import avctp
from bumble import avdtp
from bumble import avrcp
from bumble import crypto
from bumble import rfcomm
from bumble import sdp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
@@ -47,9 +53,7 @@ from bumble.hci import (
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
from bumble import crypto
# -----------------------------------------------------------------------------
# Logging
@@ -59,28 +63,35 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
rfcomm.RFCOMM_PSM: 'RFCOMM',
sdp.SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
avctp.AVCTP_PSM: 'AVCTP'
# TODO: add more PSM values
}
AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'}
# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
peer: Optional[PacketTracer.AclStream]
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
avctp_assemblers: MutableMapping[int, avctp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.avctp_assemblers = {} # AVCTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None
# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu: bytes) -> None:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
self.analyzer.emit(l2cap_pdu)
if l2cap_pdu.cid == ATT_CID:
att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload)
@@ -102,42 +113,51 @@ class PacketTracer:
connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer:
if psm := self.peer.psms.get(
connection_response.source_cid
):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)
if self.peer and (
psm := self.peer.psms.get(connection_response.source_cid)
):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(self.peer.on_avdtp_message)
elif psm == avctp.AVCTP_PSM:
self.avctp_assemblers[
connection_response.source_cid
] = avctp.MessageAssembler(self.on_avctp_message)
self.peer.avctp_assemblers[
connection_response.destination_cid
] = avctp.MessageAssembler(self.peer.on_avctp_message)
else:
# Try to find the PSM associated with this PDU
if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)):
if psm == SDP_PSM:
sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload)
if psm == sdp.SDP_PSM:
sdp_pdu = sdp.SDP_PDU.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(sdp_pdu)
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
elif psm == rfcomm.RFCOMM_PSM:
rfcomm_frame = rfcomm.RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
)
assembler = self.avdtp_assemblers.get(l2cap_pdu.cid)
if assembler:
assembler.on_pdu(l2cap_pdu.payload)
if avdtp_assembler := self.avdtp_assemblers.get(l2cap_pdu.cid):
avdtp_assembler.on_pdu(l2cap_pdu.payload)
elif psm == avctp.AVCTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVCTP]: {l2cap_pdu.payload.hex()}'
)
if avctp_assembler := self.avctp_assemblers.get(l2cap_pdu.cid):
avctp_assembler.on_pdu(l2cap_pdu.payload)
else:
psm_string = name_or_number(PSM_NAMES, psm)
self.analyzer.emit(
@@ -154,6 +174,28 @@ class PacketTracer:
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)
def on_avctp_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
):
if pid == avrcp.AVRCP_PID:
avc_frame = avc.Frame.from_bytes(payload)
details = str(avc_frame)
else:
details = payload.hex()
c_r = 'Command' if is_command else 'Response'
self.analyzer.emit(
f'{color("AVCTP", "green")} '
f'{c_r}[{transaction_label}][{name_or_number(AVCTP_PID_NAMES, pid)}] '
f'{"#" if ipid else ""}'
f'{details}'
)
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
self.packet_assembler.feed_packet(packet)

View File

@@ -402,7 +402,7 @@ class Device(HID):
report_type = pdu[0] & 0x03
buffer_flag = (pdu[0] & 0x08) >> 3
report_id = pdu[1]
logger.debug("buffer_flag: " + str(buffer_flag))
logger.debug(f"buffer_flag: {buffer_flag}")
if buffer_flag == 1:
buffer_size = (pdu[3] << 8) | pdu[2]
else:

View File

@@ -18,67 +18,25 @@
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging
import struct
from typing import Any, Awaitable, Callable, Dict, Optional, Union, cast, TYPE_CHECKING
from typing import Any, Awaitable, Callable, Deque, Dict, Optional, cast, TYPE_CHECKING
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from bumble import drivers
from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_SUPPORTED_COMMANDS_FLAGS,
HCI_SYNCHRONOUS_DATA_PACKET,
HCI_VERSION_BLUETOOTH_CORE_4_0,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Local_Supported_Features_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command,
HCI_LE_Set_Event_Mask_Command,
HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command,
HCI_Packet,
HCI_Read_Buffer_Size_Command,
HCI_Read_Local_Supported_Commands_Command,
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket,
)
from .core import (
from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
from bumble.utils import AbortableEventEmitter
from bumble.transport.common import TransportLostError
if TYPE_CHECKING:
from .transport.common import TransportSink, TransportSource
@@ -91,28 +49,70 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# fmt: off
class AclPacketQueue:
max_packet_size: int
HOST_DEFAULT_HC_LE_ACL_DATA_PACKET_LENGTH = 27
HOST_HC_TOTAL_NUM_LE_ACL_DATA_PACKETS = 1
HOST_DEFAULT_HC_ACL_DATA_PACKET_LENGTH = 27
HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1
def __init__(
self,
max_packet_size: int,
max_in_flight: int,
send: Callable[[hci.HCI_Packet], None],
) -> None:
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
self.in_flight = 0
self.send = send
self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
# fmt: on
def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
self.packets.appendleft(packet)
self.check_queue()
if self.packets:
logger.debug(
f'{self.in_flight} ACL packets in flight, '
f'{len(self.packets)} in queue'
)
def check_queue(self) -> None:
while self.packets and self.in_flight < self.max_in_flight:
packet = self.packets.pop()
self.send(packet)
self.in_flight += 1
def on_packets_completed(self, packet_count: int) -> None:
if packet_count > self.in_flight:
logger.warning(
color(
'!!! {packet_count} completed but only '
f'{self.in_flight} in flight'
)
)
packet_count = self.in_flight
self.in_flight -= packet_count
self.check_queue()
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, host: Host, handle: int, peer_address: Address, transport: int):
def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
):
self.host = host
self.handle = handle
self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
acl_packet_queue: Optional[AclPacketQueue] = (
host.le_acl_packet_queue
if transport == BT_LE_TRANSPORT
else host.acl_packet_queue
)
assert acl_packet_queue
self.acl_packet_queue = acl_packet_queue
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None:
def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
self.assembler.feed_packet(packet)
def on_acl_pdu(self, pdu: bytes) -> None:
@@ -120,16 +120,33 @@ class Connection:
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ScoLink:
peer_address: hci.Address
handle: int
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
peer_address: hci.Address
handle: int
# -----------------------------------------------------------------------------
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
acl_packet_queue: collections.deque[HCI_AclDataPacket]
cis_links: Dict[int, CisLink]
sco_links: Dict[int, ScoLink]
acl_packet_queue: Optional[AclPacketQueue] = None
le_acl_packet_queue: Optional[AclPacketQueue] = None
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
]
link_key_provider: Optional[Callable[[Address], Awaitable[Optional[bytes]]]]
link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]]
def __init__(
self,
@@ -141,14 +158,12 @@ class Host(AbortableEventEmitter):
self.hci_metadata = {}
self.ready = False # True when we can accept incoming packets
self.connections = {} # Connections, by connection handle
self.cis_links = {} # CIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
self.pending_response = None
self.hc_le_acl_data_packet_length = HOST_DEFAULT_HC_LE_ACL_DATA_PACKET_LENGTH
self.hc_total_num_le_acl_data_packets = HOST_HC_TOTAL_NUM_LE_ACL_DATA_PACKETS
self.hc_acl_data_packet_length = HOST_DEFAULT_HC_ACL_DATA_PACKET_LENGTH
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
@@ -168,7 +183,7 @@ class Host(AbortableEventEmitter):
def find_connection_by_bd_addr(
self,
bd_addr: Address,
bd_addr: hci.Address,
transport: Optional[int] = None,
check_address_type: bool = False,
) -> Optional[Connection]:
@@ -210,96 +225,196 @@ class Host(AbortableEventEmitter):
# Send a reset command unless a driver has already done so.
if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True)
await self.send_command(hci.HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True
hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
)
self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(
HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
)
self.local_le_features = struct.unpack(
'<Q', response.return_parameters.le_features
)[0]
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
)
self.local_version = response.return_parameters
await self.send_command(
HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('FFFFFFFFFFFFFF3F'))
hci.HCI_Set_Event_Mask_Command(
event_mask=hci.HCI_Set_Event_Mask_Command.mask(
[
hci.HCI_INQUIRY_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_EVENT,
hci.HCI_CONNECTION_COMPLETE_EVENT,
hci.HCI_CONNECTION_REQUEST_EVENT,
hci.HCI_DISCONNECTION_COMPLETE_EVENT,
hci.HCI_AUTHENTICATION_COMPLETE_EVENT,
hci.HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
hci.HCI_ENCRYPTION_CHANGE_EVENT,
hci.HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
hci.HCI_LINK_KEY_TYPE_CHANGED_EVENT,
hci.HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
hci.HCI_QOS_SETUP_COMPLETE_EVENT,
hci.HCI_HARDWARE_ERROR_EVENT,
hci.HCI_FLUSH_OCCURRED_EVENT,
hci.HCI_ROLE_CHANGE_EVENT,
hci.HCI_MODE_CHANGE_EVENT,
hci.HCI_RETURN_LINK_KEYS_EVENT,
hci.HCI_PIN_CODE_REQUEST_EVENT,
hci.HCI_LINK_KEY_REQUEST_EVENT,
hci.HCI_LINK_KEY_NOTIFICATION_EVENT,
hci.HCI_LOOPBACK_COMMAND_EVENT,
hci.HCI_DATA_BUFFER_OVERFLOW_EVENT,
hci.HCI_MAX_SLOTS_CHANGE_EVENT,
hci.HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
hci.HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
hci.HCI_QOS_VIOLATION_EVENT,
hci.HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
hci.HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
hci.HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
hci.HCI_SNIFF_SUBRATING_EVENT,
hci.HCI_EXTENDED_INQUIRY_RESULT_EVENT,
hci.HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
hci.HCI_IO_CAPABILITY_REQUEST_EVENT,
hci.HCI_IO_CAPABILITY_RESPONSE_EVENT,
hci.HCI_USER_CONFIRMATION_REQUEST_EVENT,
hci.HCI_USER_PASSKEY_REQUEST_EVENT,
hci.HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
hci.HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
hci.HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
hci.HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
hci.HCI_USER_PASSKEY_NOTIFICATION_EVENT,
hci.HCI_KEYPRESS_NOTIFICATION_EVENT,
hci.HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
hci.HCI_LE_META_EVENT,
]
)
)
)
if (
self.local_version is not None
and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0
and self.local_version.hci_version <= hci.HCI_VERSION_BLUETOOTH_CORE_4_0
):
# Some older controllers don't like event masks with bits they don't
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
le_event_mask = hci.HCI_LE_Set_Event_Mask_Command.mask(
[
hci.HCI_LE_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
hci.HCI_LE_DATA_LENGTH_CHANGE_EVENT,
hci.HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
hci.HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
hci.HCI_LE_SCAN_TIMEOUT_EVENT,
hci.HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
hci.HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
hci.HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
hci.HCI_LE_CONNECTION_IQ_REPORT_EVENT,
hci.HCI_LE_CTE_REQUEST_FAILED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
hci.HCI_LE_CIS_ESTABLISHED_EVENT,
hci.HCI_LE_CIS_REQUEST_EVENT,
hci.HCI_LE_CREATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_BIG_SYNC_LOST_EVENT,
hci.HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
hci.HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
]
)
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
)
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
HCI_Read_Buffer_Size_Command(), check_result=True
hci.HCI_Read_Buffer_Size_Command(), check_result=True
)
self.hc_acl_data_packet_length = (
hc_acl_data_packet_length = (
response.return_parameters.hc_acl_data_packet_length
)
self.hc_total_num_acl_data_packets = (
hc_total_num_acl_data_packets = (
response.return_parameters.hc_total_num_acl_data_packets
)
logger.debug(
'HCI ACL flow control: '
f'hc_acl_data_packet_length={self.hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}'
f'hc_acl_data_packet_length={hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}'
)
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
self.acl_packet_queue = AclPacketQueue(
max_packet_size=hc_acl_data_packet_length,
max_in_flight=hc_total_num_acl_data_packets,
send=self.send_hci_packet,
)
self.hc_le_acl_data_packet_length = (
hc_le_acl_data_packet_length = 0
hc_total_num_le_acl_data_packets = 0
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
hc_le_acl_data_packet_length = (
response.return_parameters.hc_le_acl_data_packet_length
)
self.hc_total_num_le_acl_data_packets = (
hc_total_num_le_acl_data_packets = (
response.return_parameters.hc_total_num_le_acl_data_packets
)
logger.debug(
'HCI LE ACL flow control: '
f'hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},'
'hc_total_num_le_acl_data_packets='
f'{self.hc_total_num_le_acl_data_packets}'
f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}'
)
if (
response.return_parameters.hc_le_acl_data_packet_length == 0
or response.return_parameters.hc_total_num_le_acl_data_packets == 0
):
# LE and Classic share the same values
self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = (
self.hc_total_num_acl_data_packets
)
if hc_le_acl_data_packet_length == 0 or hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same queue
self.le_acl_packet_queue = self.acl_packet_queue
else:
# Create a separate queue for LE
self.le_acl_packet_queue = AclPacketQueue(
max_packet_size=hc_le_acl_data_packet_length,
max_in_flight=hc_total_num_le_acl_data_packets,
send=self.send_hci_packet,
)
if self.supports_command(
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
):
response = await self.send_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
hci.HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
@@ -308,12 +423,34 @@ class Host(AbortableEventEmitter):
or suggested_max_tx_time != self.suggested_max_tx_time
):
await self.send_command(
HCI_LE_Write_Suggested_Default_Data_Length_Command(
hci.HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets=self.suggested_max_tx_octets,
suggested_max_tx_time=self.suggested_max_tx_time,
)
)
if self.supports_command(
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
check_result=True,
)
self.number_of_supported_advertising_sets = (
response.return_parameters.num_supported_advertising_sets
)
if self.supports_command(
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(),
check_result=True,
)
self.maximum_advertising_data_length = (
response.return_parameters.max_advertising_data_length
)
@property
def controller(self) -> Optional[TransportSink]:
return self.hci_sink
@@ -331,15 +468,14 @@ class Host(AbortableEventEmitter):
source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None:
def send_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}')
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
if self.hci_sink:
self.hci_sink.on_packet(bytes(packet))
async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time)
async with self.command_semaphore:
assert self.pending_command is None
@@ -363,11 +499,12 @@ class Host(AbortableEventEmitter):
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed ({HCI_Constant.error_name(status)})'
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise HCI_Error(status)
raise hci.HCI_Error(status)
return response
except Exception as error:
@@ -380,13 +517,24 @@ class Host(AbortableEventEmitter):
self.pending_response = None
# Use this method to send a command from a task
def send_command_sync(self, command: HCI_Command) -> None:
async def send_command(command: HCI_Command) -> None:
def send_command_sync(self, command: hci.HCI_Command) -> None:
async def send_command(command: hci.HCI_Command) -> None:
await self.send_command(command)
asyncio.create_task(send_command(command))
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
if not (connection := self.connections.get(connection_handle)):
logger.warning(f'connection 0x{connection_handle:04X} not found')
return
packet_queue = connection.acl_packet_queue
if packet_queue is None:
logger.warning(
f'no ACL packet queue for connection 0x{connection_handle:04X}'
)
return
# Create a PDU
l2cap_pdu = bytes(L2CAP_PDU(cid, pdu))
# Send the data to the controller via ACL packets
@@ -394,46 +542,23 @@ class Host(AbortableEventEmitter):
offset = 0
pb_flag = 0
while bytes_remaining:
# TODO: support different LE/Classic lengths
data_total_length = min(bytes_remaining, self.hc_le_acl_data_packet_length)
acl_packet = HCI_AclDataPacket(
data_total_length = min(bytes_remaining, packet_queue.max_packet_size)
acl_packet = hci.HCI_AclDataPacket(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=0,
data_total_length=data_total_length,
data=l2cap_pdu[offset : offset + data_total_length],
)
logger.debug(
f'{color("### HOST -> CONTROLLER", "blue")}: (CID={cid}) {acl_packet}'
)
self.queue_acl_packet(acl_packet)
logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}')
packet_queue.enqueue(acl_packet)
pb_flag = 1
offset += data_total_length
bytes_remaining -= data_total_length
def queue_acl_packet(self, acl_packet: HCI_AclDataPacket) -> None:
self.acl_packet_queue.appendleft(acl_packet)
self.check_acl_packet_queue()
if len(self.acl_packet_queue):
logger.debug(
f'{self.acl_packets_in_flight} ACL packets in flight, '
f'{len(self.acl_packet_queue)} in queue'
)
def check_acl_packet_queue(self) -> None:
# Send all we can (TODO: support different LE/Classic limits)
while (
len(self.acl_packet_queue) > 0
and self.acl_packets_in_flight < self.hc_total_num_le_acl_data_packets
):
packet = self.acl_packet_queue.pop()
self.send_hci_packet(packet)
self.acl_packets_in_flight += 1
def supports_command(self, command):
# Find the support flag position for this command
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for octet, flags in enumerate(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags):
if value == command:
# Check if the flag is set
@@ -448,17 +573,17 @@ class Host(AbortableEventEmitter):
def supported_commands(self):
commands = []
for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
if octet < len(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
command = hci.HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
return commands
def supports_le_feature(self, feature):
return (self.local_le_features & (1 << feature)) != 0
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
@property
def supported_le_features(self):
@@ -468,10 +593,10 @@ class Host(AbortableEventEmitter):
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet: bytes) -> None:
hci_packet = HCI_Packet.from_bytes(packet)
hci_packet = hci.HCI_Packet.from_bytes(packet)
if self.ready or (
isinstance(hci_packet, HCI_Command_Complete_Event)
and hci_packet.command_opcode == HCI_RESET_COMMAND
isinstance(hci_packet, hci.HCI_Command_Complete_Event)
and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
):
self.on_hci_packet(hci_packet)
else:
@@ -484,44 +609,44 @@ class Host(AbortableEventEmitter):
self.emit('flush')
def on_hci_packet(self, packet: HCI_Packet) -> None:
def on_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
# If the packet is a command, invoke the handler for this packet
if packet.hci_packet_type == HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(HCI_Command, packet))
elif packet.hci_packet_type == HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(HCI_Event, packet))
elif packet.hci_packet_type == HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
if packet.hci_packet_type == hci.HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(hci.HCI_Command, packet))
elif packet.hci_packet_type == hci.HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(hci.HCI_Event, packet))
elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet))
elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet))
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
def on_hci_command_packet(self, command: HCI_Command) -> None:
def on_hci_command_packet(self, command: hci.HCI_Command) -> None:
logger.warning(f'!!! unexpected command packet: {command}')
def on_hci_event_packet(self, event: HCI_Event) -> None:
def on_hci_event_packet(self, event: hci.HCI_Event) -> None:
handler_name = f'on_{event.name.lower()}'
handler = getattr(self, handler_name, self.on_hci_event)
handler(event)
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None:
def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
# Look for the connection to which this data belongs
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None:
def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
@@ -553,7 +678,7 @@ class Host(AbortableEventEmitter):
# This is used just for the Num_HCI_Command_Packets field, not related to
# an actual command
logger.debug('no-command event')
return None
return
return self.on_command_processed(event)
@@ -561,18 +686,17 @@ class Host(AbortableEventEmitter):
return self.on_command_processed(event)
def on_hci_number_of_completed_packets_event(self, event):
total_packets = sum(event.num_completed_packets)
if total_packets <= self.acl_packets_in_flight:
self.acl_packets_in_flight -= total_packets
self.check_acl_packet_queue()
else:
logger.warning(
color(
'!!! {total_packets} completed but only '
f'{self.acl_packets_in_flight} in flight'
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if not (connection := self.connections.get(connection_handle)):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
)
self.acl_packets_in_flight = 0
continue
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
# Classic only
def on_hci_connection_request_event(self, event):
@@ -586,11 +710,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_connection_complete_event(self, event):
# Check if this is a cancellation
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}'
f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}'
)
connection = self.connections.get(event.connection_handle)
@@ -630,7 +754,7 @@ class Host(AbortableEventEmitter):
self.on_hci_le_connection_complete_event(event)
def on_hci_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] '
@@ -666,25 +790,38 @@ class Host(AbortableEventEmitter):
def on_hci_disconnection_complete_event(self, event):
# Find the connection
if (connection := self.connections.get(event.connection_handle)) is None:
handle = event.connection_handle
if (
connection := (
self.connections.get(handle)
or self.cis_links.get(handle)
or self.sco_links.get(handle)
)
) is None:
logger.warning('!!! DISCONNECTION COMPLETE: unknown handle')
return
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{event.connection_handle:04X}] '
f'### DISCONNECTION: [0x{handle:04X}] '
f'{connection.peer_address} '
f'reason={event.reason}'
)
del self.connections[event.connection_handle]
# Notify the listeners
self.emit('disconnection', event.connection_handle, event.reason)
self.emit('disconnection', handle, event.reason)
# Remove the handle reference
_ = (
self.connections.pop(handle, 0)
or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0)
)
else:
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('disconnection_failure', event.connection_handle, event.status)
self.emit('disconnection_failure', handle, event.status)
def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
@@ -692,7 +829,7 @@ class Host(AbortableEventEmitter):
return
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
connection_parameters = ConnectionParameters(
event.connection_interval,
event.peripheral_latency,
@@ -712,7 +849,7 @@ class Host(AbortableEventEmitter):
return
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
else:
@@ -731,6 +868,7 @@ class Host(AbortableEventEmitter):
event.status,
event.advertising_handle,
event.connection_handle,
event.num_completed_extended_advertising_events,
)
def on_hci_le_cis_request_event(self, event):
@@ -744,7 +882,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.cis_links[event.connection_handle] = CisLink(
handle=event.connection_handle,
peer_address=hci.Address.ANY,
)
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
@@ -759,7 +901,7 @@ class Host(AbortableEventEmitter):
# For now, just accept everything
# TODO: delegate the decision
self.send_command_sync(
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
connection_handle=event.connection_handle,
interval_min=event.interval_min,
interval_max=event.interval_max,
@@ -790,12 +932,12 @@ class Host(AbortableEventEmitter):
),
)
if long_term_key:
response = HCI_LE_Long_Term_Key_Request_Reply_Command(
response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command(
connection_handle=event.connection_handle,
long_term_key=long_term_key,
)
else:
response = HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
connection_handle=event.connection_handle
)
@@ -804,13 +946,18 @@ class Host(AbortableEventEmitter):
asyncio.create_task(send_long_term_key())
def on_hci_synchronous_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### SCO CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr}'
)
self.sco_links[event.connection_handle] = ScoLink(
peer_address=event.bd_addr,
handle=event.connection_handle,
)
# Notify the client
self.emit(
'sco_connection',
@@ -828,16 +975,16 @@ class Host(AbortableEventEmitter):
pass
def on_hci_role_change_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}'
f'{hci.HCI_Constant.role_name(event.new_role)}'
)
self.emit('role_change', event.bd_addr, event.new_role)
else:
logger.debug(
f'role change for {event.bd_addr} failed: '
f'{HCI_Constant.error_name(event.status)}'
f'{hci.HCI_Constant.error_name(event.status)}'
)
self.emit('role_change_failure', event.bd_addr, event.status)
@@ -853,7 +1000,7 @@ class Host(AbortableEventEmitter):
def on_hci_authentication_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('connection_authentication', event.connection_handle)
else:
self.emit(
@@ -864,7 +1011,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_change_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit(
'connection_encryption_change',
event.connection_handle,
@@ -877,7 +1024,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_key_refresh_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('connection_encryption_key_refresh', event.connection_handle)
else:
self.emit(
@@ -898,16 +1045,16 @@ class Host(AbortableEventEmitter):
def on_hci_link_key_notification_event(self, event):
logger.debug(
f'link key for {event.bd_addr}: {event.link_key.hex()}, '
f'type={HCI_Constant.link_key_type_name(event.key_type)}'
f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}'
)
self.emit('link_key', event.bd_addr, event.link_key, event.key_type)
def on_hci_simple_pairing_complete_event(self, event):
logger.debug(
f'simple pairing complete for {event.bd_addr}: '
f'status={HCI_Constant.status_name(event.status)}'
f'status={hci.HCI_Constant.status_name(event.status)}'
)
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('classic_pairing', event.bd_addr)
else:
self.emit('classic_pairing_failure', event.bd_addr, event.status)
@@ -927,11 +1074,11 @@ class Host(AbortableEventEmitter):
self.link_key_provider(event.bd_addr),
)
if link_key:
response = HCI_Link_Key_Request_Reply_Command(
response = hci.HCI_Link_Key_Request_Reply_Command(
bd_addr=event.bd_addr, link_key=link_key
)
else:
response = HCI_Link_Key_Request_Negative_Reply_Command(
response = hci.HCI_Link_Key_Request_Negative_Reply_Command(
bd_addr=event.bd_addr
)
@@ -988,7 +1135,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS:
if event.status != hci.HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
utf8_name = event.remote_name
@@ -1004,3 +1151,15 @@ class Host(AbortableEventEmitter):
event.bd_addr,
event.host_supported_features,
)
def on_hci_le_read_remote_features_complete_event(self, event):
if event.status != hci.HCI_SUCCESS:
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)
else:
self.emit(
'le_remote_features',
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)

View File

@@ -149,9 +149,10 @@ L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2046
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
@@ -188,8 +189,11 @@ class LeCreditBasedChannelSpec:
or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
):
raise ValueError('max credits out of range')
if self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU:
raise ValueError('MTU too small')
if (
self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
):
raise ValueError('MTU out of range')
if (
self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
@@ -204,7 +208,7 @@ class L2CAP_PDU:
@staticmethod
def from_bytes(data: bytes) -> L2CAP_PDU:
# Sanity check
# Check parameters
if len(data) < 4:
raise ValueError('not enough data for L2CAP header')
@@ -1644,12 +1648,13 @@ class ChannelManager:
def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
pdu_bytes = bytes(pdu)
logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}: {pdu_str}'
f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):

View File

@@ -26,9 +26,13 @@ from bumble.hci import (
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
HCI_PAGE_TIMEOUT_ERROR,
HCI_Connection_Complete_Event,
)
from bumble import controller
from typing import Optional, Set
# -----------------------------------------------------------------------------
# Logging
@@ -57,6 +61,8 @@ class LocalLink:
Link bus for controllers to communicate with each other
'''
controllers: Set[controller.Controller]
def __init__(self):
self.controllers = set()
self.pending_connection = None
@@ -79,7 +85,9 @@ class LocalLink:
return controller
return None
def find_classic_controller(self, address):
def find_classic_controller(
self, address: Address
) -> Optional[controller.Controller]:
for controller in self.controllers:
if controller.public_address == address:
return controller
@@ -188,6 +196,60 @@ class LocalLink:
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_encrypted(central_address, rand, ediv, ltk)
def create_cis(
self,
central_controller: controller.Controller,
peripheral_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Request {central_controller.random_address} -> {peripheral_address}'
)
if peripheral_controller := self.find_controller(peripheral_address):
asyncio.get_running_loop().call_soon(
peripheral_controller.on_link_cis_request,
central_controller.random_address,
cig_id,
cis_id,
)
def accept_cis(
self,
peripheral_controller: controller.Controller,
central_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Accept {peripheral_controller.random_address} -> {central_address}'
)
if central_controller := self.find_controller(central_address):
asyncio.get_running_loop().call_soon(
central_controller.on_link_cis_established, cig_id, cis_id
)
asyncio.get_running_loop().call_soon(
peripheral_controller.on_link_cis_established, cig_id, cis_id
)
def disconnect_cis(
self,
initiator_controller: controller.Controller,
peer_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Disconnect {initiator_controller.random_address} -> {peer_address}'
)
if peer_controller := self.find_controller(peer_address):
asyncio.get_running_loop().call_soon(
initiator_controller.on_link_cis_disconnected, cig_id, cis_id
)
asyncio.get_running_loop().call_soon(
peer_controller.on_link_cis_disconnected, cig_id, cis_id
)
############################################################
# Classic handlers
############################################################
@@ -271,6 +333,52 @@ class LocalLink:
initiator_controller.public_address, int(not (initiator_new_role))
)
def classic_sco_connect(
self,
initiator_controller: controller.Controller,
responder_address: Address,
link_type: int,
):
logger.debug(
f'[Classic] {initiator_controller.public_address} connects SCO to {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
# Initiator controller should handle it.
assert responder_controller
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
link_type,
)
def classic_accept_sco_connection(
self,
responder_controller: controller.Controller,
initiator_address: Address,
link_type: int,
):
logger.debug(
f'[Classic] {responder_controller.public_address} accepts to connect SCO {initiator_address}'
)
initiator_controller = self.find_classic_controller(initiator_address)
if initiator_controller is None:
responder_controller.on_classic_sco_connection_complete(
responder_controller.public_address,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
link_type,
)
return
async def task():
initiator_controller.on_classic_sco_connection_complete(
responder_controller.public_address, HCI_SUCCESS, link_type
)
asyncio.create_task(task())
responder_controller.on_classic_sco_connection_complete(
initiator_controller.public_address, HCI_SUCCESS, link_type
)
# -----------------------------------------------------------------------------
class RemoteLink:

View File

@@ -285,10 +285,11 @@ class HostService(HostServicer):
raise NotImplementedError(
"TODO: add support for extended advertising in Bumble"
)
if request.interval:
raise NotImplementedError("TODO: add support for `request.interval`")
if request.interval_range:
raise NotImplementedError("TODO: add support for `request.interval_range`")
if advertising_interval := request.interval:
self.device.config.advertising_interval_min = int(advertising_interval)
self.device.config.advertising_interval_max = int(advertising_interval)
if interval_range := request.interval_range:
self.device.config.advertising_interval_max += int(interval_range)
if request.primary_phy:
raise NotImplementedError("TODO: add support for `request.primary_phy`")
if request.secondary_phy:

View File

@@ -110,7 +110,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(just_works=empty_pb2.Empty()))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
assert answer.answer_variant() == 'confirm' and answer.confirm is not None
return answer.confirm
@@ -125,7 +125,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(numeric_comparison=number))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
assert answer.answer_variant() == 'confirm' and answer.confirm is not None
return answer.confirm
@@ -140,7 +140,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(passkey_entry_request=empty_pb2.Empty()))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
if answer.answer_variant() is None:
return None
@@ -157,7 +157,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(pin_code_request=empty_pb2.Empty()))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
if answer.answer_variant() is None:
return None

View File

@@ -18,7 +18,7 @@
# -----------------------------------------------------------------------------
import struct
import logging
from typing import List
from typing import List, Optional
from bumble import l2cap
from ..core import AdvertisingData
@@ -67,7 +67,7 @@ class AshaService(TemplateService):
self.emit('volume', connection, value[0])
# Handler for audio control commands
def on_audio_control_point_write(connection: Connection, value):
def on_audio_control_point_write(connection: Optional[Connection], value):
logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == AshaService.OPCODE_START:

View File

@@ -114,7 +114,7 @@ class SamplingFrequency(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
# fmt: off
FREQ_8000 = 0x01
FREQ_8000 = 0x01
FREQ_11025 = 0x02
FREQ_16000 = 0x03
FREQ_22050 = 0x04
@@ -430,7 +430,7 @@ class AseResponseCode(enum.IntEnum):
REJECTED_METADATA = 0x0B
INVALID_METADATA = 0x0C
INSUFFICIENT_RESOURCES = 0x0D
UNSPECIFIED_ERROR = 0x0E
UNSPECIFIED_ERROR = 0x0E
class AseReasonCode(enum.IntEnum):
@@ -1066,7 +1066,7 @@ class AseStateMachine(gatt.Characteristic):
# Readonly. Do nothing in the setter.
pass
def on_read(self, _: device.Connection) -> bytes:
def on_read(self, _: Optional[device.Connection]) -> bytes:
return self.value
def __str__(self) -> str:

View File

@@ -19,7 +19,7 @@
from __future__ import annotations
import enum
import struct
from typing import Optional
from typing import Optional, Tuple
from bumble import core
from bumble import crypto
@@ -31,6 +31,9 @@ from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
SET_IDENTITY_RESOLVING_KEY_LENGTH = 16
class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
@@ -66,6 +69,10 @@ def k1(n: bytes, salt: bytes, p: bytes) -> bytes:
def sef(k: bytes, r: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.5 SIRK encryption function sef.
SIRK decryption function sdf shares the same algorithm. The only difference is that argument r is:
* Plaintext in encryption
* Cipher in decryption
'''
return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r)
@@ -105,6 +112,11 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
if len(set_identity_resolving_key) != SET_IDENTITY_RESOLVING_KEY_LENGTH:
raise ValueError(
f'Invalid SIRK length {len(set_identity_resolving_key)}, expected {SET_IDENTITY_RESOLVING_KEY_LENGTH}'
)
characteristics = []
self.set_identity_resolving_key = set_identity_resolving_key
@@ -113,7 +125,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(read=self.on_sirk_read),
)
characteristics.append(self.set_identity_resolving_key_characteristic)
@@ -123,7 +135,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
@@ -134,7 +146,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
@@ -145,18 +157,32 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics)
def on_sirk_read(self, _connection: device.Connection) -> bytes:
async def on_sirk_read(self, connection: Optional[device.Connection]) -> bytes:
if self.set_identity_resolving_key_type == SirkType.PLAINTEXT:
return bytes([SirkType.PLAINTEXT]) + self.set_identity_resolving_key
sirk_bytes = self.set_identity_resolving_key
else:
raise NotImplementedError('TODO: Pending async Characteristic read.')
assert connection
if connection.transport == core.BT_LE_TRANSPORT:
key = await connection.device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await connection.device.get_link_key(connection.peer_address)
if not key:
raise RuntimeError('LTK or LinkKey is not present')
sirk_bytes = sef(key, self.set_identity_resolving_key)
return bytes([self.set_identity_resolving_key_type]) + sirk_bytes
def get_advertising_data(self) -> bytes:
return bytes(
@@ -203,3 +229,29 @@ class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]
async def read_set_identity_resolving_key(self) -> Tuple[SirkType, bytes]:
'''Reads SIRK and decrypts if encrypted.'''
response = await self.set_identity_resolving_key.read_value()
if len(response) != SET_IDENTITY_RESOLVING_KEY_LENGTH + 1:
raise RuntimeError('Invalid SIRK value')
sirk_type = SirkType(response[0])
if sirk_type == SirkType.PLAINTEXT:
sirk = response[1:]
else:
connection = self.service_proxy.client.connection
device = connection.device
if connection.transport == core.BT_LE_TRANSPORT:
key = await device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await device.get_link_key(connection.peer_address)
if not key:
raise RuntimeError('LTK or LinkKey is not present')
sirk = sef(key, response[1:])
return (sirk_type, sirk)

View File

@@ -118,8 +118,8 @@ CRC_TABLE = bytes([
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
])
RFCOMM_DEFAULT_INITIAL_RX_CREDITS = 7
RFCOMM_DEFAULT_PREFERRED_MTU = 1280
RFCOMM_DEFAULT_WINDOW_SIZE = 16
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
@@ -438,20 +438,24 @@ class DLC(EventEmitter):
multiplexer: Multiplexer,
dlci: int,
max_frame_size: int,
initial_tx_credits: int,
window_size: int,
) -> None:
super().__init__()
self.multiplexer = multiplexer
self.dlci = dlci
self.rx_credits = RFCOMM_DEFAULT_INITIAL_RX_CREDITS
self.rx_threshold = self.rx_credits // 2
self.tx_credits = initial_tx_credits
self.max_frame_size = max_frame_size
self.window_size = window_size
self.rx_credits = window_size
self.rx_threshold = window_size // 2
self.tx_credits = window_size
self.tx_buffer = b''
self.state = DLC.State.INIT
self.role = multiplexer.role
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None
self.connection_result = None
self.drained = asyncio.Event()
self.drained.set()
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
@@ -534,14 +538,15 @@ class DLC(EventEmitter):
f'[{self.dlci}] {len(data)} bytes, '
f'rx_credits={self.rx_credits}: {data.hex()}'
)
if len(data) and self.sink:
self.sink(data) # pylint: disable=not-callable
if data:
if self.sink:
self.sink(data) # pylint: disable=not-callable
# Update the credits
if self.rx_credits > 0:
self.rx_credits -= 1
else:
logger.warning(color('!!! received frame with no rx credits', 'red'))
# Update the credits
if self.rx_credits > 0:
self.rx_credits -= 1
else:
logger.warning(color('!!! received frame with no rx credits', 'red'))
# Check if there's anything to send (including credits)
self.process_tx()
@@ -580,9 +585,9 @@ class DLC(EventEmitter):
cl=0xE0,
priority=7,
ack_timer=0,
max_frame_size=RFCOMM_DEFAULT_PREFERRED_MTU,
max_frame_size=self.max_frame_size,
max_retransmissions=0,
window_size=RFCOMM_DEFAULT_INITIAL_RX_CREDITS,
window_size=self.window_size,
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=0, data=bytes(pn))
logger.debug(f'>>> PN Response: {pn}')
@@ -591,7 +596,7 @@ class DLC(EventEmitter):
def rx_credits_needed(self) -> int:
if self.rx_credits <= self.rx_threshold:
return RFCOMM_DEFAULT_INITIAL_RX_CREDITS - self.rx_credits
return self.window_size - self.rx_credits
return 0
@@ -631,6 +636,8 @@ class DLC(EventEmitter):
)
rx_credits_needed = 0
if not self.tx_buffer:
self.drained.set()
# Stream protocol
def write(self, data: Union[bytes, str]) -> None:
@@ -643,11 +650,11 @@ class DLC(EventEmitter):
raise ValueError('write only accept bytes or strings')
self.tx_buffer += data
self.drained.clear()
self.process_tx()
def drain(self) -> None:
# TODO
pass
async def drain(self) -> None:
await self.drained.wait()
def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state.name})'
@@ -843,7 +850,12 @@ class Multiplexer(EventEmitter):
)
await self.disconnection_result
async def open_dlc(self, channel: int) -> DLC:
async def open_dlc(
self,
channel: int,
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
window_size: int = RFCOMM_DEFAULT_WINDOW_SIZE,
) -> DLC:
if self.state != Multiplexer.State.CONNECTED:
if self.state == Multiplexer.State.OPENING:
raise InvalidStateError('open already in progress')
@@ -855,9 +867,9 @@ class Multiplexer(EventEmitter):
cl=0xF0,
priority=7,
ack_timer=0,
max_frame_size=RFCOMM_DEFAULT_PREFERRED_MTU,
max_frame_size=max_frame_size,
max_retransmissions=0,
window_size=RFCOMM_DEFAULT_INITIAL_RX_CREDITS,
window_size=window_size,
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=1, data=bytes(pn))
logger.debug(f'>>> Sending MCC: {pn}')

View File

@@ -97,7 +97,8 @@ SDP_CLIENT_EXECUTABLE_URL_ATTRIBUTE_ID = 0X000B
SDP_ICON_URL_ATTRIBUTE_ID = 0X000C
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0X000D
# Attribute Identifier (cf. Assigned Numbers for Service Discovery)
# Profile-specific Attribute Identifiers (cf. Assigned Numbers for Service Discovery)
# used by AVRCP, HFP and A2DP
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID = 0x0311
@@ -115,7 +116,8 @@ SDP_ATTRIBUTE_ID_NAMES = {
SDP_DOCUMENTATION_URL_ATTRIBUTE_ID: 'SDP_DOCUMENTATION_URL_ATTRIBUTE_ID',
SDP_CLIENT_EXECUTABLE_URL_ATTRIBUTE_ID: 'SDP_CLIENT_EXECUTABLE_URL_ATTRIBUTE_ID',
SDP_ICON_URL_ATTRIBUTE_ID: 'SDP_ICON_URL_ATTRIBUTE_ID',
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 'SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID'
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 'SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID',
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: 'SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID',
}
SDP_PUBLIC_BROWSE_ROOT = core.UUID.from_16_bits(0x1002, 'PublicBrowseRoot')

View File

@@ -1134,8 +1134,10 @@ class Session:
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
self.link_key = await self.manager.device.get_link_key(
self.connection.peer_address
)
if self.link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
@@ -1143,7 +1145,7 @@ class Session:
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
self.ltk = self.derive_ltk(self.link_key, self.ct2)
def distribute_keys(self) -> None:
# Distribute the keys as required
@@ -1991,10 +1993,8 @@ class Manager(EventEmitter):
) -> None:
# Store the keys in the key store
if self.device.keystore and identity_address is not None:
self.device.abort_on(
'flush', self.device.update_keys(str(identity_address), keys)
)
# Make sure on_pairing emits after key update.
await self.device.update_keys(str(identity_address), keys)
# Notify the device
self.device.on_pairing(session.connection, identity_address, keys, session.sc)

View File

@@ -82,14 +82,13 @@ async def open_transport(name: str) -> Transport:
scheme, *tail = name.split(':', 1)
spec = tail[0] if tail else None
metadata = None
if spec:
# Metadata may precede the spec
if spec.startswith('['):
metadata_str, *tail = spec[1:].split(']')
spec = tail[0] if tail else None
metadata = dict([entry.split('=') for entry in metadata_str.split(',')])
else:
metadata = None
transport = await _open_transport(scheme, spec)
if metadata:
@@ -198,12 +197,13 @@ async def open_transport_or_link(name: str) -> Transport:
"""
if name.startswith('link-relay:'):
logger.warning('Link Relay has been deprecated.')
from ..controller import Controller
from ..link import RemoteLink # lazy import
link = RemoteLink(name[11:])
await link.wait_until_connected()
controller = Controller('remote', link=link)
controller = Controller('remote', link=link) # type:ignore[arg-type]
class LinkTransport(Transport):
async def close(self):

View File

@@ -108,7 +108,7 @@ async def open_usb_transport(spec: str) -> Transport:
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER,
)
READ_SIZE = 1024
READ_SIZE = 4096
class UsbPacketSink:
def __init__(self, device, acl_out):

View File

@@ -17,9 +17,10 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import traceback
import collections
import enum
import functools
import logging
import sys
import warnings
from typing import (
@@ -34,7 +35,7 @@ from typing import (
Union,
overload,
)
from functools import wraps, partial
from pyee import EventEmitter
from .colors import color
@@ -131,13 +132,14 @@ class EventWatcher:
Args:
emitter: EventEmitter to watch
event: Event name
handler: (Optional) Event handler. When nothing is passed, this method works as a decorator.
handler: (Optional) Event handler. When nothing is passed, this method
works as a decorator.
'''
def wrapper(f: _Handler) -> _Handler:
self.handlers.append((emitter, event, f))
emitter.on(event, f)
return f
def wrapper(wrapped: _Handler) -> _Handler:
self.handlers.append((emitter, event, wrapped))
emitter.on(event, wrapped)
return wrapped
return wrapper if handler is None else wrapper(handler)
@@ -157,13 +159,14 @@ class EventWatcher:
Args:
emitter: EventEmitter to watch
event: Event name
handler: (Optional) Event handler. When nothing passed, this method works as a decorator.
handler: (Optional) Event handler. When nothing passed, this method works
as a decorator.
'''
def wrapper(f: _Handler) -> _Handler:
self.handlers.append((emitter, event, f))
emitter.once(event, f)
return f
def wrapper(wrapped: _Handler) -> _Handler:
self.handlers.append((emitter, event, wrapped))
emitter.once(event, wrapped)
return wrapped
return wrapper if handler is None else wrapper(handler)
@@ -223,13 +226,13 @@ class CompositeEventEmitter(AbortableEventEmitter):
if self._listener:
# Call the deregistration methods for each base class that has them
for cls in self._listener.__class__.mro():
if hasattr(cls, '_bumble_register_composite'):
cls._bumble_deregister_composite(listener, self)
if '_bumble_register_composite' in cls.__dict__:
cls._bumble_deregister_composite(self._listener, self)
self._listener = listener
if listener:
# Call the registration methods for each base class that has them
for cls in listener.__class__.mro():
if hasattr(cls, '_bumble_deregister_composite'):
if '_bumble_deregister_composite' in cls.__dict__:
cls._bumble_register_composite(listener, self)
@@ -276,21 +279,18 @@ class AsyncRunner:
"""
def decorator(func):
@wraps(func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
coroutine = func(*args, **kwargs)
if queue is None:
# Create a task to run the coroutine
# Spawn the coroutine as a task
async def run():
try:
await coroutine
except Exception:
logger.warning(
f'{color("!!! Exception in wrapper:", "red")} '
f'{traceback.format_exc()}'
)
logger.exception(color("!!! Exception in wrapper:", "red"))
asyncio.create_task(run())
AsyncRunner.spawn(run())
else:
# Queue the coroutine to be awaited by the work queue
queue.enqueue(coroutine)
@@ -413,30 +413,35 @@ class FlowControlAsyncPipe:
self.check_pump()
# -----------------------------------------------------------------------------
async def async_call(function, *args, **kwargs):
"""
Immediately calls the function with provided args and kwargs, wrapping it in an async function.
Rust's `pyo3_asyncio` library needs functions to be marked async to properly inject a running loop.
Immediately calls the function with provided args and kwargs, wrapping it in an
async function.
Rust's `pyo3_asyncio` library needs functions to be marked async to properly inject
a running loop.
result = await async_call(some_function, ...)
"""
return function(*args, **kwargs)
# -----------------------------------------------------------------------------
def wrap_async(function):
"""
Wraps the provided function in an async function.
"""
return partial(async_call, function)
return functools.partial(async_call, function)
# -----------------------------------------------------------------------------
def deprecated(msg: str):
"""
Throw deprecation warning before execution.
"""
def wrapper(function):
@wraps(function)
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, DeprecationWarning)
return function(*args, **kwargs)
@@ -446,13 +451,14 @@ def deprecated(msg: str):
return wrapper
# -----------------------------------------------------------------------------
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
@@ -460,3 +466,22 @@ def experimental(msg: str):
return inner
return wrapper
# -----------------------------------------------------------------------------
class OpenIntEnum(enum.IntEnum):
"""
Subclass of enum.IntEnum that can hold integer values outside the set of
predefined values. This is convenient for implementing protocols where some
integer constants may be added over time.
"""
@classmethod
def _missing_(cls, value):
if not isinstance(value, int):
return None
obj = int.__new__(cls, value)
obj._value_ = value
obj._name_ = f"{cls.__name__}[{value}]"
return obj

View File

@@ -7,16 +7,36 @@ throughput and/or latency between two devices.
# General Usage
```
Usage: bench.py [OPTIONS] COMMAND [ARGS]...
Usage: bumble-bench [OPTIONS] COMMAND [ARGS]...
Options:
--device-config FILENAME Device configuration file
--role [sender|receiver|ping|pong]
--mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server]
--att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517]
-s, --packet-size SIZE Packet size (server role) [8<=x<=4096]
-c, --packet-count COUNT Packet count (server role)
-sd, --start-delay SECONDS Start delay (server role)
--extended-data-length TEXT Request a data length upon connection,
specified as tx_octets/tx_time
--rfcomm-channel INTEGER RFComm channel to use
--rfcomm-uuid TEXT RFComm service UUID to use (ignored if
--rfcomm-channel is not 0)
--l2cap-psm INTEGER L2CAP PSM to use
--l2cap-mtu INTEGER L2CAP MTU to use
--l2cap-mps INTEGER L2CAP MPS to use
--l2cap-max-credits INTEGER L2CAP maximum number of credits allowed for
the peer
-s, --packet-size SIZE Packet size (client or ping role)
[8<=x<=4096]
-c, --packet-count COUNT Packet count (client or ping role)
-sd, --start-delay SECONDS Start delay (client or ping role)
--repeat N Repeat the run N times (client and ping
roles)(0, which is the fault, to run just
once)
--repeat-delay SECONDS Delay, in seconds, between repeats
--pace MILLISECONDS Wait N milliseconds between packets (0,
which is the fault, to send as fast as
possible)
--linger Don't exit at the end of a run (server and
pong roles)
--help Show this message and exit.
Commands:
@@ -35,17 +55,18 @@ Options:
--connection-interval, --ci CONNECTION_INTERVAL
Connection interval (in ms)
--phy [1m|2m|coded] PHY to use
--authenticate Authenticate (RFComm only)
--encrypt Encrypt the connection (RFComm only)
--help Show this message and exit.
```
To test once device against another, one of the two devices must be running
To test once device against another, one of the two devices must be running
the ``peripheral`` command and the other the ``central`` command. The device
running the ``peripheral`` command will accept connections from the device
running the ``central`` command.
When using Bluetooth LE (all modes except for ``rfcomm-server`` and ``rfcomm-client``utils),
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
using the ``--peripheral`` option. The address will be printed by the Peripheral when
it starts.
@@ -83,7 +104,7 @@ the other on `usb:1`, and two consoles/terminals. We will run a command in each.
$ bumble-bench central usb:1
```
In this default configuration, the Central runs a Sender, as a GATT client,
In this default configuration, the Central runs a Sender, as a GATT client,
connecting to the Peripheral running a Receiver, as a GATT server.
!!! example "L2CAP Throughput"

274
examples/avrcp_as_sink.html Normal file
View File

@@ -0,0 +1,274 @@
<html>
<head>
<style>
* {
font-family: sans-serif;
}
</style>
</head>
<body>
Server Port <input id="port" type="text" value="8989"></input> <button id="connectButton" onclick="connect()">Connect</button><br>
<div id="socketState"></div>
<br>
<div id="buttons"></div><br>
<hr>
<button onclick="onGetPlayStatusButtonClicked()">Get Play Status</button><br>
<div id="getPlayStatusResponseTable"></div>
<hr>
<button onclick="onGetElementAttributesButtonClicked()">Get Element Attributes</button><br>
<div id="getElementAttributesResponseTable"></div>
<hr>
<table>
<tr>
<b>VOLUME</b>:
<button onclick="onVolumeDownButtonClicked()">-</button>
<button onclick="onVolumeUpButtonClicked()">+</button>&nbsp;
<span id="volumeText"></span><br>
</tr>
<tr>
<td><b>PLAYBACK STATUS</b></td><td><span id="playbackStatusText"></span></td>
</tr>
<tr>
<td><b>POSITION</b></td><td><span id="positionText"></span></td>
</tr>
<tr>
<td><b>TRACK</b></td><td><span id="trackText"></span></td>
</tr>
<tr>
<td><b>ADDRESSED PLAYER</b></td><td><span id="addressedPlayerText"></span></td>
</tr>
<tr>
<td><b>UID COUNTER</b></td><td><span id="uidCounterText"></span></td>
</tr>
<tr>
<td><b>SUPPORTED EVENTS</b></td><td><span id="supportedEventsText"></span></td>
</tr>
<tr>
<td><b>PLAYER SETTINGS</b></td><td><div id="playerSettingsTable"></div></td>
</tr>
</table>
<script>
const portInput = document.getElementById("port")
const connectButton = document.getElementById("connectButton")
const socketState = document.getElementById("socketState")
const volumeText = document.getElementById("volumeText")
const positionText = document.getElementById("positionText")
const trackText = document.getElementById("trackText")
const playbackStatusText = document.getElementById("playbackStatusText")
const addressedPlayerText = document.getElementById("addressedPlayerText")
const uidCounterText = document.getElementById("uidCounterText")
const supportedEventsText = document.getElementById("supportedEventsText")
const playerSettingsTable = document.getElementById("playerSettingsTable")
const getPlayStatusResponseTable = document.getElementById("getPlayStatusResponseTable")
const getElementAttributesResponseTable = document.getElementById("getElementAttributesResponseTable")
let socket
let volume = 0
const keyNames = [
"SELECT",
"UP",
"DOWN",
"LEFT",
"RIGHT",
"RIGHT_UP",
"RIGHT_DOWN",
"LEFT_UP",
"LEFT_DOWN",
"ROOT_MENU",
"SETUP_MENU",
"CONTENTS_MENU",
"FAVORITE_MENU",
"EXIT",
"NUMBER_0",
"NUMBER_1",
"NUMBER_2",
"NUMBER_3",
"NUMBER_4",
"NUMBER_5",
"NUMBER_6",
"NUMBER_7",
"NUMBER_8",
"NUMBER_9",
"DOT",
"ENTER",
"CLEAR",
"CHANNEL_UP",
"CHANNEL_DOWN",
"PREVIOUS_CHANNEL",
"SOUND_SELECT",
"INPUT_SELECT",
"DISPLAY_INFORMATION",
"HELP",
"PAGE_UP",
"PAGE_DOWN",
"POWER",
"VOLUME_UP",
"VOLUME_DOWN",
"MUTE",
"PLAY",
"STOP",
"PAUSE",
"RECORD",
"REWIND",
"FAST_FORWARD",
"EJECT",
"FORWARD",
"BACKWARD",
"ANGLE",
"SUBPICTURE",
"F1",
"F2",
"F3",
"F4",
"F5",
]
document.addEventListener('keydown', onKeyDown)
document.addEventListener('keyup', onKeyUp)
const buttons = document.getElementById("buttons")
keyNames.forEach(name => {
const button = document.createElement("BUTTON")
button.appendChild(document.createTextNode(name))
button.addEventListener("mousedown", event => {
send({type: 'send-key-down', key: name})
})
button.addEventListener("mouseup", event => {
send({type: 'send-key-up', key: name})
})
buttons.appendChild(button)
})
updateVolume(0)
function connect() {
socket = new WebSocket(`ws://localhost:${portInput.value}`);
socket.onopen = _ => {
socketState.innerText = 'OPEN'
connectButton.disabled = true
}
socket.onclose = _ => {
socketState.innerText = 'CLOSED'
connectButton.disabled = false
}
socket.onerror = (error) => {
socketState.innerText = 'ERROR'
console.log(`ERROR: ${error}`)
connectButton.disabled = false
}
socket.onmessage = (message) => {
onMessage(JSON.parse(message.data))
}
}
function send(message) {
if (socket && socket.readyState == WebSocket.OPEN) {
socket.send(JSON.stringify(message))
}
}
function hmsText(position) {
const h_1 = 1000 * 60 * 60
const h = Math.floor(position / h_1)
position -= h * h_1
const m_1 = 1000 * 60
const m = Math.floor(position / m_1)
position -= m * m_1
const s_1 = 1000
const s = Math.floor(position / s_1)
position -= s * s_1
return `${h}:${m.toString().padStart(2, "0")}:${s.toString().padStart(2, "0")}:${position}`
}
function setTableHead(table, columns) {
let thead = table.createTHead()
let row = thead.insertRow()
for (let column of columns) {
let th = document.createElement("th")
let text = document.createTextNode(column)
th.appendChild(text)
row.appendChild(th)
}
}
function createTable(rows) {
const table = document.createElement("table")
if (rows.length != 0) {
columns = Object.keys(rows[0])
setTableHead(table, columns)
}
for (let element of rows) {
let row = table.insertRow()
for (key in element) {
let cell = row.insertCell()
let text = document.createTextNode(element[key])
cell.appendChild(text)
}
}
return table
}
function onMessage(message) {
console.log(message)
if (message.type == "set-volume") {
updateVolume(message.params.volume)
} else if (message.type == "supported-events") {
supportedEventsText.innerText = JSON.stringify(message.params.events)
} else if (message.type == "playback-position-changed") {
positionText.innerText = hmsText(message.params.position)
} else if (message.type == "playback-status-changed") {
playbackStatusText.innerText = message.params.status
} else if (message.type == "player-settings-changed") {
playerSettingsTable.replaceChildren(message.params.settings)
} else if (message.type == "track-changed") {
trackText.innerText = message.params.identifier
} else if (message.type == "addressed-player-changed") {
addressedPlayerText.innerText = JSON.stringify(message.params.player)
} else if (message.type == "uids-changed") {
uidCounterText.innerText = message.params.uid_counter
} else if (message.type == "get-play-status-response") {
getPlayStatusResponseTable.replaceChildren(message.params)
} else if (message.type == "get-element-attributes-response") {
getElementAttributesResponseTable.replaceChildren(createTable(message.params))
}
}
function updateVolume(newVolume) {
volume = newVolume
volumeText.innerText = `${volume} (${Math.round(100*volume/0x7F)}%)`
}
function onKeyDown(event) {
console.log(event)
send({ type: 'send-key-down', key: event.key })
}
function onKeyUp(event) {
console.log(event)
send({ type: 'send-key-up', key: event.key })
}
function onVolumeUpButtonClicked() {
updateVolume(Math.min(volume + 5, 0x7F))
send({ type: 'set-volume', volume })
}
function onVolumeDownButtonClicked() {
updateVolume(Math.max(volume - 5, 0))
send({ type: 'set-volume', volume })
}
function onGetPlayStatusButtonClicked() {
send({ type: 'get-play-status', volume })
}
function onGetElementAttributesButtonClicked() {
send({ type: 'get-element-attributes' })
}
</script>
</body>
</html>

View File

@@ -19,9 +19,11 @@ import asyncio
import logging
import sys
import os
import struct
from bumble.core import AdvertisingData
from bumble.device import AdvertisingType, Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
@@ -52,6 +54,16 @@ async def main():
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
AdvertisingData(
[
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340)),
]
)
)
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()

408
examples/run_avrcp.py Normal file
View File

@@ -0,0 +1,408 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
import sys
import os
import logging
import websockets
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble import avc
from bumble import avrcp
from bumble import avdtp
from bumble import a2dp
from bumble import utils
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def sdp_records():
a2dp_sink_service_record_handle = 0x00010001
avrcp_controller_service_record_handle = 0x00010002
avrcp_target_service_record_handle = 0x00010003
# pylint: disable=line-too-long
return {
a2dp_sink_service_record_handle: a2dp.make_audio_sink_service_sdp_records(
a2dp_sink_service_record_handle
),
avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records(
avrcp_controller_service_record_handle
),
avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records(
avrcp_controller_service_record_handle
),
}
# -----------------------------------------------------------------------------
def codec_capabilities():
return avdtp.MediaCodecCapabilities(
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=a2dp.A2DP_SBC_CODEC_TYPE,
media_codec_information=a2dp.SbcMediaCodecInformation.from_lists(
sampling_frequencies=[48000, 44100, 32000, 16000],
channel_modes=[
a2dp.SBC_MONO_CHANNEL_MODE,
a2dp.SBC_DUAL_CHANNEL_MODE,
a2dp.SBC_STEREO_CHANNEL_MODE,
a2dp.SBC_JOINT_STEREO_CHANNEL_MODE,
],
block_lengths=[4, 8, 12, 16],
subbands=[4, 8],
allocation_methods=[
a2dp.SBC_LOUDNESS_ALLOCATION_METHOD,
a2dp.SBC_SNR_ALLOCATION_METHOD,
],
minimum_bitpool_value=2,
maximum_bitpool_value=53,
),
)
# -----------------------------------------------------------------------------
def on_avdtp_connection(server):
# Add a sink endpoint to the server
sink = server.add_sink(codec_capabilities())
sink.on('rtp_packet', on_rtp_packet)
# -----------------------------------------------------------------------------
def on_rtp_packet(packet):
print(f'RTP: {packet}')
# -----------------------------------------------------------------------------
def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer):
async def get_supported_events():
events = await avrcp_protocol.get_supported_events()
print("SUPPORTED EVENTS:", events)
websocket_server.send_message(
{
"type": "supported-events",
"params": {"events": [event.name for event in events]},
}
)
if avrcp.EventId.TRACK_CHANGED in events:
utils.AsyncRunner.spawn(monitor_track_changed())
if avrcp.EventId.PLAYBACK_STATUS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_playback_status())
if avrcp.EventId.PLAYBACK_POS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_playback_position())
if avrcp.EventId.PLAYER_APPLICATION_SETTING_CHANGED in events:
utils.AsyncRunner.spawn(monitor_player_application_settings())
if avrcp.EventId.AVAILABLE_PLAYERS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_available_players())
if avrcp.EventId.ADDRESSED_PLAYER_CHANGED in events:
utils.AsyncRunner.spawn(monitor_addressed_player())
if avrcp.EventId.UIDS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_uids())
if avrcp.EventId.VOLUME_CHANGED in events:
utils.AsyncRunner.spawn(monitor_volume())
utils.AsyncRunner.spawn(get_supported_events())
async def monitor_track_changed():
async for identifier in avrcp_protocol.monitor_track_changed():
print("TRACK CHANGED:", identifier.hex())
websocket_server.send_message(
{"type": "track-changed", "params": {"identifier": identifier.hex()}}
)
async def monitor_playback_status():
async for playback_status in avrcp_protocol.monitor_playback_status():
print("PLAYBACK STATUS CHANGED:", playback_status.name)
websocket_server.send_message(
{
"type": "playback-status-changed",
"params": {"status": playback_status.name},
}
)
async def monitor_playback_position():
async for playback_position in avrcp_protocol.monitor_playback_position(
playback_interval=1
):
print("PLAYBACK POSITION CHANGED:", playback_position)
websocket_server.send_message(
{
"type": "playback-position-changed",
"params": {"position": playback_position},
}
)
async def monitor_player_application_settings():
async for settings in avrcp_protocol.monitor_player_application_settings():
print("PLAYER APPLICATION SETTINGS:", settings)
settings_as_dict = [
{"attribute": setting.attribute_id.name, "value": setting.value_id.name}
for setting in settings
]
websocket_server.send_message(
{
"type": "player-settings-changed",
"params": {"settings": settings_as_dict},
}
)
async def monitor_available_players():
async for _ in avrcp_protocol.monitor_available_players():
print("AVAILABLE PLAYERS CHANGED")
websocket_server.send_message(
{"type": "available-players-changed", "params": {}}
)
async def monitor_addressed_player():
async for player in avrcp_protocol.monitor_addressed_player():
print("ADDRESSED PLAYER CHANGED")
websocket_server.send_message(
{
"type": "addressed-player-changed",
"params": {
"player": {
"player_id": player.player_id,
"uid_counter": player.uid_counter,
}
},
}
)
async def monitor_uids():
async for uid_counter in avrcp_protocol.monitor_uids():
print("UIDS CHANGED")
websocket_server.send_message(
{
"type": "uids-changed",
"params": {
"uid_counter": uid_counter,
},
}
)
async def monitor_volume():
async for volume in avrcp_protocol.monitor_volume():
print("VOLUME CHANGED:", volume)
websocket_server.send_message(
{"type": "volume-changed", "params": {"volume": volume}}
)
# -----------------------------------------------------------------------------
class WebSocketServer:
def __init__(
self, avrcp_protocol: avrcp.Protocol, avrcp_delegate: Delegate
) -> None:
self.socket = None
self.delegate = None
self.avrcp_protocol = avrcp_protocol
self.avrcp_delegate = avrcp_delegate
async def start(self) -> None:
# pylint: disable-next=no-member
await websockets.serve(self.serve, 'localhost', 8989) # type: ignore
async def serve(self, socket, _path) -> None:
print('### WebSocket connected')
self.socket = socket
while True:
try:
message = await socket.recv()
print('Received: ', str(message))
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'send-key-down':
await self.on_send_key_down(parsed)
elif message_type == 'send-key-up':
await self.on_send_key_up(parsed)
elif message_type == 'set-volume':
await self.on_set_volume(parsed)
elif message_type == 'get-play-status':
await self.on_get_play_status()
elif message_type == 'get-element-attributes':
await self.on_get_element_attributes()
except websockets.exceptions.ConnectionClosedOK:
self.socket = None
break
async def on_send_key_down(self, message: dict) -> None:
key = avc.PassThroughFrame.OperationId[message["key"]]
await self.avrcp_protocol.send_key_event(key, True)
async def on_send_key_up(self, message: dict) -> None:
key = avc.PassThroughFrame.OperationId[message["key"]]
await self.avrcp_protocol.send_key_event(key, False)
async def on_set_volume(self, message: dict) -> None:
volume = message["volume"]
self.avrcp_delegate.volume = volume
self.avrcp_protocol.notify_volume_changed(volume)
async def on_get_play_status(self) -> None:
play_status = await self.avrcp_protocol.get_play_status()
self.send_message(
{
"type": "get-play-status-response",
"params": {
"song_length": play_status.song_length,
"song_position": play_status.song_position,
"play_status": play_status.play_status.name,
},
}
)
async def on_get_element_attributes(self) -> None:
attributes = await self.avrcp_protocol.get_element_attributes(
0,
[
avrcp.MediaAttributeId.TITLE,
avrcp.MediaAttributeId.ARTIST_NAME,
avrcp.MediaAttributeId.ALBUM_NAME,
avrcp.MediaAttributeId.TRACK_NUMBER,
avrcp.MediaAttributeId.TOTAL_NUMBER_OF_TRACKS,
avrcp.MediaAttributeId.GENRE,
avrcp.MediaAttributeId.PLAYING_TIME,
avrcp.MediaAttributeId.DEFAULT_COVER_ART,
],
)
self.send_message(
{
"type": "get-element-attributes-response",
"params": [
{
"attribute_id": attribute.attribute_id.name,
"attribute_value": attribute.attribute_value,
}
for attribute in attributes
],
}
)
def send_message(self, message: dict) -> None:
if self.socket is None:
print("no socket, dropping message")
return
serialized = json.dumps(message)
utils.AsyncRunner.spawn(self.socket.send(serialized))
# -----------------------------------------------------------------------------
class Delegate(avrcp.Delegate):
def __init__(self):
super().__init__(
[avrcp.EventId.VOLUME_CHANGED, avrcp.EventId.PLAYBACK_STATUS_CHANGED]
)
self.websocket_server = None
async def set_absolute_volume(self, volume: int) -> None:
await super().set_absolute_volume(volume)
if self.websocket_server is not None:
self.websocket_server.send_message(
{"type": "set-volume", "params": {"volume": volume}}
)
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 3:
print(
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
'<sbc-file> [<bt-addr>]'
)
print('example: run_avrcp_controller.py classic1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
# Setup the SDP to expose the sink service
device.sdp_service_records = sdp_records()
# Start the controller
await device.power_on()
# Create a listener to wait for AVDTP connections
listener = avdtp.Listener(avdtp.Listener.create_registrar(device))
listener.on('connection', on_avdtp_connection)
avrcp_delegate = Delegate()
avrcp_protocol = avrcp.Protocol(avrcp_delegate)
avrcp_protocol.listen(device)
websocket_server = WebSocketServer(avrcp_protocol, avrcp_delegate)
avrcp_delegate.websocket_server = websocket_server
avrcp_protocol.on(
"start", lambda: on_avrcp_start(avrcp_protocol, websocket_server)
)
await websocket_server.start()
if len(sys.argv) >= 5:
# Connect to the peer
target_address = sys.argv[4]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(
target_address, transport=BT_BR_EDR_TRANSPORT
)
print(f'=== Connected to {connection.peer_address}!')
# Request authentication
print('*** Authenticating...')
await connection.authenticate()
print('*** Authenticated')
# Enable encryption
print('*** Enabling encryption...')
await connection.encrypt()
print('*** Encryption on')
server = await avdtp.Protocol.connect(connection)
listener.set_server(connection, server)
sink = server.add_sink(codec_capabilities())
sink.on('rtp_packet', on_rtp_packet)
await avrcp_protocol.connect(connection)
else:
# Start being discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
await asyncio.get_event_loop().create_future()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -22,10 +22,11 @@ import os
from bumble.device import (
Device,
Connection,
AdvertisingParameters,
AdvertisingEventProperties,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
@@ -61,12 +62,7 @@ async def main() -> None:
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
advertising_set = await devices[0].create_advertising_set()
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC

View File

@@ -98,13 +98,7 @@ async def main() -> None:
)
+ csis.get_advertising_data()
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await device.create_advertising_set(advertising_data=advertising_data)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]

View File

@@ -19,8 +19,13 @@ import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.device import (
AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
)
from bumble.hci import Address
from bumble.transport import open_transport_or_link
@@ -35,20 +40,16 @@ async def main() -> None:
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
advertising_properties = AdvertisingEventProperties.from_advertising_type(
AdvertisingType(int(sys.argv[3]))
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
advertising_properties = AdvertisingEventProperties()
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
peer_address = Address(sys.argv[4])
else:
target = Address.ANY
peer_address = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
@@ -58,8 +59,11 @@ async def main() -> None:
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=advertising_properties,
peer_address=peer_address,
)
)
await hci_transport.source.terminated

View File

@@ -0,0 +1,99 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device
from bumble.hci import Address
from bumble.core import AdvertisingData
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_extended_advertiser_2.py <config-file> <transport-spec>')
print('example: run_extended_advertiser_2.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
if not device.supports_le_extended_advertising:
print("Device does not support extended advertising")
return
print("Max advertising sets:", device.host.number_of_supported_advertising_sets)
print(
"Max advertising data length:", device.host.maximum_advertising_data_length
)
if device.host.number_of_supported_advertising_sets >= 1:
advertising_data1 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))]
)
set1 = await device.create_advertising_set(
advertising_data=bytes(advertising_data1),
)
print("Selected TX power 1:", set1.selected_tx_power)
advertising_data2 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
)
if device.host.number_of_supported_advertising_sets >= 2:
set2 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F1"),
advertising_parameters=AdvertisingParameters(),
advertising_data=bytes(advertising_data2),
auto_start=False,
auto_restart=True,
)
print("Selected TX power 2:", set2.selected_tx_power)
await set2.start()
if device.host.number_of_supported_advertising_sets >= 3:
scan_response_data3 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))]
)
set3 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F2"),
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, is_scannable=True
)
),
scan_response_data=bytes(scan_response_data3),
)
print("Selected TX power 3:", set2.selected_tx_power)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -590,12 +590,12 @@ async def main():
def on_set_protocol_cb(protocol: int):
retValue = hid_device.GetSetStatus()
# We do not support SET_PROTOCOL.
print("SET_PROTOCOL report_id: " + str(protocol))
print(f"SET_PROTOCOL report_id: {protocol}")
retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
return retValue
def on_virtual_cable_unplug_cb():
print(f'Received Virtual Cable Unplug')
print('Received Virtual Cable Unplug')
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')

View File

@@ -22,13 +22,12 @@ import os
import struct
import secrets
from bumble.core import AdvertisingData
from bumble.device import Device, CisLink
from bumble.device import Device, CisLink, AdvertisingParameters
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_IsoDataPacket,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
CodecSpecificCapabilities,
@@ -179,11 +178,7 @@ async def main() -> None:
device.once('cis_establishment', on_cis)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_set = await device.create_advertising_set(
advertising_data=advertising_data,
)

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools">
package="com.github.google.bumble.btbench">
<uses-sdk android:minSdkVersion="30" android:targetSdkVersion="34" />
<!-- Request legacy Bluetooth permissions on older devices. -->
<uses-permission android:name="android.permission.BLUETOOTH" android:maxSdkVersion="30" />
<uses-permission android:name="android.permission.BLUETOOTH_ADMIN" android:maxSdkVersion="30" />
@@ -22,11 +22,10 @@
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/Theme.BTBench"
tools:targetApi="31">
>
<activity
android:name=".MainActivity"
android:exported="true"
android:label="@string/app_name"
android:theme="@style/Theme.BTBench">
<intent-filter>
<action android:name="android.intent.action.MAIN" />

View File

@@ -28,8 +28,8 @@ private val Log = Logger.getLogger("btbench.l2cap-client")
class L2capClient(
private val viewModel: AppViewModel,
val bluetoothAdapter: BluetoothAdapter,
val context: Context
private val bluetoothAdapter: BluetoothAdapter,
private val context: Context
) {
@SuppressLint("MissingPermission")
fun run() {
@@ -74,12 +74,18 @@ class L2capClient(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
if (viewModel.use2mPhy) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android
// won't request a larger link layer maximum data length otherwise.
gatt.requestMtu(517)
}
}
},

View File

@@ -23,19 +23,20 @@ import androidx.compose.runtime.setValue
import androidx.lifecycle.ViewModel
import java.util.UUID
val DEFAULT_RFCOMM_UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF6D3AE")
val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF6D3AE")
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_PSM = 128
class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableStateOf(0)
var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true)
var mtu by mutableStateOf(0)
var rxPhy by mutableStateOf(0)
var txPhy by mutableStateOf(0)
var mtu by mutableIntStateOf(0)
var rxPhy by mutableIntStateOf(0)
var txPhy by mutableIntStateOf(0)
var senderPacketCountSlider by mutableFloatStateOf(0.0F)
var senderPacketSizeSlider by mutableFloatStateOf(0.0F)
var senderPacketCount by mutableIntStateOf(DEFAULT_SENDER_PACKET_COUNT)
@@ -79,18 +80,18 @@ class AppViewModel : ViewModel() {
}
fun updateSenderPacketCountSlider() {
if (senderPacketCount <= 10) {
senderPacketCountSlider = 0.0F
senderPacketCountSlider = if (senderPacketCount <= 10) {
0.0F
} else if (senderPacketCount <= 50) {
senderPacketCountSlider = 0.2F
0.2F
} else if (senderPacketCount <= 100) {
senderPacketCountSlider = 0.4F
0.4F
} else if (senderPacketCount <= 500) {
senderPacketCountSlider = 0.6F
0.6F
} else if (senderPacketCount <= 1000) {
senderPacketCountSlider = 0.8F
0.8F
} else {
senderPacketCountSlider = 1.0F
1.0F
}
with(preferences!!.edit()) {
@@ -100,18 +101,18 @@ class AppViewModel : ViewModel() {
}
fun updateSenderPacketCount() {
if (senderPacketCountSlider < 0.1F) {
senderPacketCount = 10
senderPacketCount = if (senderPacketCountSlider < 0.1F) {
10
} else if (senderPacketCountSlider < 0.3F) {
senderPacketCount = 50
50
} else if (senderPacketCountSlider < 0.5F) {
senderPacketCount = 100
100
} else if (senderPacketCountSlider < 0.7F) {
senderPacketCount = 500
500
} else if (senderPacketCountSlider < 0.9F) {
senderPacketCount = 1000
1000
} else {
senderPacketCount = 10000
10000
}
with(preferences!!.edit()) {
@@ -121,18 +122,18 @@ class AppViewModel : ViewModel() {
}
fun updateSenderPacketSizeSlider() {
if (senderPacketSize <= 16) {
senderPacketSizeSlider = 0.0F
senderPacketSizeSlider = if (senderPacketSize <= 16) {
0.0F
} else if (senderPacketSize <= 256) {
senderPacketSizeSlider = 0.02F
0.02F
} else if (senderPacketSize <= 512) {
senderPacketSizeSlider = 0.4F
0.4F
} else if (senderPacketSize <= 1024) {
senderPacketSizeSlider = 0.6F
0.6F
} else if (senderPacketSize <= 2048) {
senderPacketSizeSlider = 0.8F
0.8F
} else {
senderPacketSizeSlider = 1.0F
1.0F
}
with(preferences!!.edit()) {
@@ -142,18 +143,18 @@ class AppViewModel : ViewModel() {
}
fun updateSenderPacketSize() {
if (senderPacketSizeSlider < 0.1F) {
senderPacketSize = 16
senderPacketSize = if (senderPacketSizeSlider < 0.1F) {
16
} else if (senderPacketSizeSlider < 0.3F) {
senderPacketSize = 256
256
} else if (senderPacketSizeSlider < 0.5F) {
senderPacketSize = 512
512
} else if (senderPacketSizeSlider < 0.7F) {
senderPacketSize = 1024
1024
} else if (senderPacketSizeSlider < 0.9F) {
senderPacketSize = 2048
2048
} else {
senderPacketSize = 4096
4096
}
with(preferences!!.edit()) {

View File

@@ -10,7 +10,7 @@ android {
defaultConfig {
applicationId = "com.github.google.bumble.remotehci"
minSdk = 26
minSdk = 29
targetSdk = 33
versionCode = 1
versionName = "1.0"

View File

@@ -4,6 +4,7 @@ import android.hardware.bluetooth.V1_0.Status;
import android.os.IBinder;
import android.os.RemoteException;
import android.os.ServiceManager;
import android.os.Trace;
import android.util.Log;
import java.util.ArrayList;
@@ -53,6 +54,7 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
private final android.hardware.bluetooth.V1_0.IBluetoothHci mHciService;
private final HciHalCallback mHciCallbacks;
private int mInitializationStatus = -1;
private final boolean mTracingEnabled = Trace.isEnabled();
public static HciHidlHal create(HciHalCallback hciCallbacks) {
@@ -89,6 +91,7 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
}
// Map the status code.
Log.d(TAG, "Initialization status = " + mInitializationStatus);
switch (mInitializationStatus) {
case android.hardware.bluetooth.V1_0.Status.SUCCESS:
return Status.SUCCESS;
@@ -108,6 +111,10 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
public void sendPacket(HciPacket.Type type, byte[] packet) {
ArrayList<Byte> data = HciPacket.byteArrayToList(packet);
if (mTracingEnabled) {
Trace.beginAsyncSection("SEND_PACKET_TO_HAL", 1);
}
try {
switch (type) {
case COMMAND:
@@ -125,6 +132,10 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
} catch (RemoteException error) {
Log.w(TAG, "failed to forward packet: " + error);
}
if (mTracingEnabled) {
Trace.endAsyncSection("SEND_PACKET_TO_HAL", 1);
}
}
@Override
@@ -157,6 +168,7 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
private final android.hardware.bluetooth.IBluetoothHci mHciService;
private final HciHalCallback mHciCallbacks;
private int mInitializationStatus = android.hardware.bluetooth.Status.SUCCESS;
private final boolean mTracingEnabled = Trace.isEnabled();
public static HciAidlHal create(HciHalCallback hciCallbacks) {
IBinder binder = ServiceManager.getService("android.hardware.bluetooth.IBluetoothHci/default");
@@ -187,6 +199,7 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
}
// Map the status code.
Log.d(TAG, "Initialization status = " + mInitializationStatus);
switch (mInitializationStatus) {
case android.hardware.bluetooth.Status.SUCCESS:
return Status.SUCCESS;
@@ -208,6 +221,10 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
// HciHal methods.
@Override
public void sendPacket(HciPacket.Type type, byte[] packet) {
if (mTracingEnabled) {
Trace.beginAsyncSection("SEND_PACKET_TO_HAL", 1);
}
try {
switch (type) {
case COMMAND:
@@ -229,6 +246,10 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
} catch (RemoteException error) {
Log.w(TAG, "failed to forward packet: " + error);
}
if (mTracingEnabled) {
Trace.endAsyncSection("SEND_PACKET_TO_HAL", 1);
}
}
// IBluetoothHciCallbacks methods.

View File

@@ -1,5 +1,6 @@
package com.github.google.bumble.remotehci;
import android.os.Trace;
import android.util.Log;
import java.io.IOException;
@@ -15,6 +16,7 @@ public class HciServer {
private final int mPort;
private final Listener mListener;
private OutputStream mOutputStream;
private final boolean mTracingEnabled = Trace.isEnabled();
public interface Listener extends HciParser.Sink {
void onHostConnectionState(boolean connected);
@@ -27,6 +29,8 @@ public class HciServer {
}
public void run() throws IOException {
Log.i(TAG, "Tracing enabled: " + mTracingEnabled);
for (;;) {
try {
loop();
@@ -42,6 +46,7 @@ public class HciServer {
try (ServerSocket serverSocket = new ServerSocket(mPort)) {
mListener.onMessage("Waiting for connection on port " + serverSocket.getLocalPort());
try (Socket clientSocket = serverSocket.accept()) {
clientSocket.setTcpNoDelay(true);
mListener.onHostConnectionState(true);
mListener.onMessage("Connected");
HciParser parser = new HciParser(mListener);
@@ -72,6 +77,10 @@ public class HciServer {
}
public void sendPacket(HciPacket.Type type, byte[] packet) {
if (mTracingEnabled) {
Trace.beginAsyncSection("SEND_PACKET_FROM_HAL", 2);
}
// Create a combined data buffer so we can write it out in a single call.
byte[] data = new byte[packet.length + 1];
data[0] = type.value;
@@ -88,5 +97,9 @@ public class HciServer {
Log.d(TAG, "no client, dropping packet");
}
}
if (mTracingEnabled) {
Trace.endAsyncSection("SEND_PACKET_FROM_HAL", 2);
}
}
}

View File

@@ -59,6 +59,7 @@ console_scripts =
bumble-ble-rpa-tool = bumble.apps.ble_rpa_tool:main
bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-controller-loopback = bumble.apps.controller_loopback:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
@@ -81,15 +82,15 @@ console_scripts =
build =
build >= 0.7
test =
pytest >= 6.2
pytest-asyncio >= 0.17
pytest >= 8.0
pytest-asyncio == 0.21.1
pytest-html >= 3.2.0
coverage >= 6.4
development =
black == 22.10
grpcio-tools >= 1.57.0
invoke >= 1.7.3
mypy == 1.5.0
mypy == 1.8.0
nox >= 2022
pylint == 2.15.8
pyyaml >= 6.0
@@ -98,7 +99,7 @@ development =
types-protobuf >= 4.21.0
avatar =
pandora-avatar == 0.0.5
rootcanal == 1.3.0 ; python_version>='3.10'
rootcanal == 1.7.0 ; python_version>='3.10'
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6

246
tests/avrcp_test.py Normal file
View File

@@ -0,0 +1,246 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import pytest
from bumble import core
from bumble import device
from bumble import host
from bumble import controller
from bumble import link
from bumble import avc
from bumble import avrcp
from bumble import avctp
from bumble.transport import common
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = link.LocalLink()
self.controllers = [
controller.Controller('C1', link=self.link, public_address=addresses[0]),
controller.Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
device.Device(
address=addresses[0],
host=host.Host(
self.controllers[0], common.AsyncPipeSink(self.controllers[0])
),
),
device.Device(
address=addresses[1],
host=host.Host(
self.controllers[1], common.AsyncPipeSink(self.controllers[1])
),
),
]
self.devices[0].classic_enabled = True
self.devices[1].classic_enabled = True
self.connections = [None, None]
self.protocols = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
async def setup_connections(self):
await self.devices[0].power_on()
await self.devices[1].power_on()
self.connections = await asyncio.gather(
self.devices[0].connect(
self.devices[1].public_address, core.BT_BR_EDR_TRANSPORT
),
self.devices[1].accept(self.devices[0].public_address),
)
self.protocols = [avrcp.Protocol(), avrcp.Protocol()]
self.protocols[0].listen(self.devices[1])
await self.protocols[1].connect(self.connections[0])
# -----------------------------------------------------------------------------
def test_frame_parser():
with pytest.raises(ValueError) as error:
avc.Frame.from_bytes(bytes.fromhex("11480000"))
x = bytes.fromhex("014D0208")
frame = avc.Frame.from_bytes(x)
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 7
assert frame.opcode == 8
x = bytes.fromhex("014DFF0108")
frame = avc.Frame.from_bytes(x)
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 260
assert frame.opcode == 8
x = bytes.fromhex("0148000019581000000103")
frame = avc.Frame.from_bytes(x)
assert isinstance(frame, avc.CommandFrame)
assert frame.ctype == avc.CommandFrame.CommandType.STATUS
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 0
assert frame.opcode == 0
# -----------------------------------------------------------------------------
def test_vendor_dependent_command():
x = bytes.fromhex("0148000019581000000103")
frame = avc.Frame.from_bytes(x)
assert isinstance(frame, avc.VendorDependentCommandFrame)
assert frame.company_id == 0x1958
assert frame.vendor_dependent_data == bytes.fromhex("1000000103")
frame = avc.VendorDependentCommandFrame(
avc.CommandFrame.CommandType.STATUS,
avc.Frame.SubunitType.PANEL,
0,
0x1958,
bytes.fromhex("1000000103"),
)
assert bytes(frame) == x
# -----------------------------------------------------------------------------
def test_avctp_message_assembler():
received_message = []
def on_message(transaction_label, is_response, ipid, pid, payload):
received_message.append((transaction_label, is_response, ipid, pid, payload))
assembler = avctp.MessageAssembler(on_message)
payload = bytes.fromhex("01")
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
received_message = []
payload = bytes.fromhex("010203")
assembler.on_pdu(bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert len(received_message) == 0
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
received_message = []
payload = bytes.fromhex("010203")
assembler.on_pdu(
bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 3, 0x11, 0x22]) + payload[0:1]
)
assembler.on_pdu(
bytes([1 << 4 | 0b10 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[1:2]
)
assembler.on_pdu(
bytes([1 << 4 | 0b11 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[2:3]
)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
# received_message = []
# parameter = bytes.fromhex("010203")
# assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
# assert len(received_message) == 0
# -----------------------------------------------------------------------------
def test_avrcp_pdu_assembler():
received_pdus = []
def on_pdu(pdu_id, parameter):
received_pdus.append((pdu_id, parameter))
assembler = avrcp.PduAssembler(on_pdu)
parameter = bytes.fromhex("01")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, len(parameter)) + parameter)
assert len(received_pdus) == 0
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, 1) + parameter[0:1])
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b10, 1) + parameter[1:2])
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, 1) + parameter[2:3])
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
assert len(received_pdus) == 0
def test_passthrough_commands():
play_pressed = avc.PassThroughCommandFrame(
avc.CommandFrame.CommandType.CONTROL,
avc.CommandFrame.SubunitType.PANEL,
0,
avc.PassThroughCommandFrame.StateFlag.PRESSED,
avc.PassThroughCommandFrame.OperationId.PLAY,
b'',
)
play_pressed_bytes = bytes(play_pressed)
parsed = avc.Frame.from_bytes(play_pressed_bytes)
assert isinstance(parsed, avc.PassThroughCommandFrame)
assert parsed.operation_id == avc.PassThroughCommandFrame.OperationId.PLAY
assert bytes(parsed) == play_pressed_bytes
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_events():
two_devices = TwoDevices()
await two_devices.setup_connections()
supported_events = await two_devices.protocols[0].get_supported_events()
assert supported_events == []
delegate1 = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED])
two_devices.protocols[0].delegate = delegate1
supported_events = await two_devices.protocols[1].get_supported_events()
assert supported_events == [avrcp.EventId.VOLUME_CHANGED]
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_frame_parser()
test_vendor_dependent_command()
test_avctp_message_assembler()
test_avrcp_pdu_assembler()
test_passthrough_commands()
test_get_supported_events()

View File

@@ -48,7 +48,8 @@ from bumble.profiles.bap import (
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
from .test_utils import TwoDevices
from tests.test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging

View File

@@ -20,6 +20,7 @@ import os
import pytest
import struct
import logging
from unittest import mock
from bumble import device
from bumble.profiles import csip
@@ -68,14 +69,18 @@ def test_sef():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
@pytest.mark.parametrize(
'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)]
)
async def test_csis(sirk_type):
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices()
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
set_identity_resolving_key_type=csip.SirkType.PLAINTEXT,
set_identity_resolving_key_type=sirk_type,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
@@ -83,15 +88,19 @@ async def test_csis():
)
await devices.setup_connection()
# Mock encryption.
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK)
devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK)
peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy
)
assert (
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED

View File

@@ -28,8 +28,8 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device
from bumble.host import Host
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
HCI_COMMAND_STATUS_PENDING,
@@ -50,6 +50,9 @@ from bumble.gatt import (
GATT_APPEARANCE_CHARACTERISTIC,
)
from .test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -73,6 +76,13 @@ async def test_device_connect_parallel():
d1 = Device(host=Host(None, None))
d2 = Device(host=Host(None, None))
def _send(packet):
pass
d0.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d1.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d2.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
# enable classic
d0.classic_enabled = True
d1.classic_enabled = True
@@ -245,12 +255,12 @@ async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
await device.start_advertising()
assert device.is_advertising
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
await device.stop_advertising()
assert not device.is_advertising
# -----------------------------------------------------------------------------
@@ -264,7 +274,7 @@ async def test_legacy_advertising_connection(own_address_type):
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
await device.start_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
@@ -292,7 +302,7 @@ async def test_legacy_advertising_connection(own_address_type):
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
@@ -301,20 +311,18 @@ async def test_legacy_advertising_disconnection(auto_restart):
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_advertising_set_termination(
HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0
)
device.on_disconnection(0x0001, 0)
await async_barrier()
await async_barrier()
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
assert device.is_advertising
else:
device.start_legacy_advertising.assert_not_called()
assert not device.is_advertising
# -----------------------------------------------------------------------------
@@ -323,12 +331,13 @@ async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
advertising_set = await device.create_advertising_set()
assert device.extended_advertising_sets
assert advertising_set.enabled
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
await advertising_set.stop()
assert not advertising_set.enabled
# -----------------------------------------------------------------------------
@@ -340,8 +349,8 @@ async def test_extended_advertising():
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
device.on_connection(
0x0001,
@@ -352,8 +361,9 @@ async def test_extended_advertising_connection(own_address_type):
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
advertising_set.advertising_handle,
0x0001,
0,
)
if own_address_type == OwnAddressType.PUBLIC:
@@ -367,42 +377,60 @@ async def test_extended_advertising_connection(own_address_type):
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
async def test_get_remote_le_features():
devices = TwoDevices()
await devices.setup_connection()
device.start_extended_advertising = mock.AsyncMock()
assert (await devices.connections[0].get_remote_le_features()) is not None
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cis():
devices = TwoDevices()
await devices.setup_connection()
peripheral_cis_futures = {}
def on_cis_request(
acl_connection: Connection,
cis_handle: int,
_cig_id: int,
_cis_id: int,
):
acl_connection.abort_on(
'disconnection', devices[1].accept_cis_request(cis_handle)
)
else:
device.start_extended_advertising.assert_not_called()
peripheral_cis_futures[cis_handle] = asyncio.get_running_loop().create_future()
devices[1].on('cis_request', on_cis_request)
devices[1].on(
'cis_establishment',
lambda cis_link: peripheral_cis_futures[cis_link.handle].set_result(None),
)
cis_handles = await devices[0].setup_cig(
cig_id=1,
cis_id=[2, 3],
sdu_interval=(0, 0),
framing=0,
max_sdu=(0, 0),
retransmission_number=0,
max_transport_latency=(0, 0),
)
assert len(cis_handles) == 2
cis_links = await devices[0].create_cis(
[
(cis_handles[0], devices.connections[0].handle),
(cis_handles[1], devices.connections[0].handle),
]
)
await asyncio.gather(*peripheral_cis_futures.values())
assert len(cis_links) == 2
await cis_links[0].disconnect()
await cis_links[1].disconnect()
# -----------------------------------------------------------------------------

View File

@@ -20,11 +20,10 @@ import logging
import os
import struct
import pytest
from unittest.mock import Mock, ANY
from unittest.mock import AsyncMock, Mock, ANY
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_server import Server
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -51,6 +50,7 @@ from bumble.att import (
ATT_Error_Response,
ATT_Read_By_Group_Type_Request,
)
from .test_utils import async_barrier
# -----------------------------------------------------------------------------
@@ -120,9 +120,9 @@ async def test_characteristic_encoding():
Characteristic.READABLE,
123,
)
x = c.read_value(None)
x = await c.read_value(None)
assert x == bytes([123])
c.write_value(None, bytes([122]))
await c.write_value(None, bytes([122]))
assert c.value == 122
class FooProxy(CharacteristicProxy):
@@ -152,7 +152,22 @@ async def test_characteristic_encoding():
bytes([123]),
)
service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
async def async_read(connection):
return 0x05060708
async_characteristic = PackedCharacteristicAdapter(
Characteristic(
'2AB7E91B-43E8-4F73-AC3B-80C1683B47F9',
Characteristic.Properties.READ,
Characteristic.READABLE,
CharacteristicValue(read=async_read),
),
'>I',
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic]
)
server.add_service(service)
await client.power_on()
@@ -184,6 +199,13 @@ async def test_characteristic_encoding():
await async_barrier()
assert characteristic.value == bytes([50])
c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid)
assert len(c2) == 1
c2 = c2[0]
cd2 = PackedCharacteristicAdapter(c2, ">I")
cd2v = await cd2.read_value()
assert cd2v == 0x05060708
last_change = None
def on_change(value):
@@ -285,7 +307,8 @@ async def test_attribute_getters():
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
@pytest.mark.asyncio
async def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(
@@ -296,11 +319,11 @@ def test_CharacteristicAdapter():
)
a = CharacteristicAdapter(c)
value = a.read_value(None)
value = await a.read_value(None)
assert value == v
v = bytes([3, 4, 5])
a.write_value(None, v)
await a.write_value(None, v)
assert c.value == v
# Simple delegated adapter
@@ -308,11 +331,11 @@ def test_CharacteristicAdapter():
c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
)
value = a.read_value(None)
value = await a.read_value(None)
assert value == bytes(reversed(v))
v = bytes([3, 4, 5])
a.write_value(None, v)
await a.write_value(None, v)
assert a.value == bytes(reversed(v))
# Packed adapter with single element format
@@ -321,10 +344,10 @@ def test_CharacteristicAdapter():
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
value = await a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
await a.write_value(None, pv)
assert a.value == v
# Packed adapter with multi-element format
@@ -334,10 +357,10 @@ def test_CharacteristicAdapter():
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
value = await a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
await a.write_value(None, pv)
assert a.value == (v1, v2)
# Mapped adapter
@@ -348,10 +371,10 @@ def test_CharacteristicAdapter():
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
value = await a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
await a.write_value(None, pv)
assert a.value == mapped
# UTF-8 adapter
@@ -360,27 +383,49 @@ def test_CharacteristicAdapter():
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
value = await a.read_value(None)
assert value == ev
c.value = None
a.write_value(None, ev)
await a.write_value(None, ev)
assert a.value == v
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
@pytest.mark.asyncio
async def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
async def read_value(connection):
return b
c = CharacteristicValue(read=read_value)
x = await c.read(None)
assert x == b
result = []
c = CharacteristicValue(
write=lambda connection, value: result.append((connection, value))
)
m = Mock()
c = CharacteristicValue(write=m)
z = object()
c.write(z, b)
assert result == [(z, b)]
m.assert_called_once_with(z, b)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_CharacteristicValue_async():
b = bytes([1, 2, 3])
async def read_value(connection):
return b
c = CharacteristicValue(read=read_value)
x = await c.read(None)
assert x == b
m = AsyncMock()
c = CharacteristicValue(write=m)
z = object()
await c.write(z, b)
m.assert_called_once_with(z, b)
# -----------------------------------------------------------------------------
@@ -412,13 +457,6 @@ class LinkedDevices:
self.paired = [None, None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():
@@ -961,12 +999,18 @@ Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration
# -----------------------------------------------------------------------------
async def async_main():
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_unsubscribe()
await test_characteristic_encoding()
await test_mtu_exchange()
await test_CharacteristicValue()
await test_CharacteristicValue_async()
await test_CharacteristicAdapter()
# -----------------------------------------------------------------------------
@@ -1105,9 +1149,4 @@ def test_get_attribute_group():
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())

View File

@@ -23,6 +23,8 @@ from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Address,
CodingFormat,
CodecID,
@@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command(
le_event_mask=bytes.fromhex('0011223344556677')
le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
[
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
]
)
)
assert command.le_event_mask == bytes.fromhex('0100000000010000')
basic_check(command)

View File

@@ -23,8 +23,10 @@ import pytest
from typing import Tuple
from .test_utils import TwoDevices
from bumble import core
from bumble import hfp
from bumble import rfcomm
from bumble import hci
# -----------------------------------------------------------------------------
@@ -87,6 +89,68 @@ async def test_slc():
ag_task.cancel()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sco_setup():
devices = TwoDevices()
# Enable Classic connections
devices[0].classic_enabled = True
devices[1].classic_enabled = True
# Start
await devices[0].power_on()
await devices[1].power_on()
connections = await asyncio.gather(
devices[0].connect(
devices[1].public_address, transport=core.BT_BR_EDR_TRANSPORT
),
devices[1].accept(devices[0].public_address),
)
def on_sco_request(_connection, _link_type: int):
connections[1].abort_on(
'disconnection',
devices[1].send_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connections[1].peer_address,
**hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S1
].asdict(),
)
),
)
devices[1].on('sco_request', on_sco_request)
sco_connection_futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
for device, future in zip(devices, sco_connection_futures):
device.on('sco_connection', future.set_result)
await devices[0].send_command(
hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(),
)
)
sco_connections = await asyncio.gather(*sco_connection_futures)
sco_disconnection_futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
for future, sco_connection in zip(sco_disconnection_futures, sco_connections):
sco_connection.on('disconnection', future.set_result)
await sco_connections[0].disconnect()
await asyncio.gather(*sco_disconnection_futures)
# -----------------------------------------------------------------------------
async def run():
await test_slc()

View File

@@ -547,6 +547,13 @@ async def test_self_smp_over_classic():
MockSmpSession.send_public_key_command.assert_not_called()
MockSmpSession.send_pairing_random_command.assert_not_called()
for i in range(2):
assert (
await two_devices.devices[i].keystore.get(
str(two_devices.connections[i].peer_address)
)
).link_key
# -----------------------------------------------------------------------------
@pytest.mark.asyncio

View File

@@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
from typing import List, Optional
from bumble.controller import Controller
@@ -22,6 +26,7 @@ from bumble.transport import AsyncPipeSink
from bumble.hci import Address
# -----------------------------------------------------------------------------
class TwoDevices:
connections: List[Optional[Connection]]
@@ -29,17 +34,18 @@ class TwoDevices:
self.connections = [None, None]
self.link = LocalLink()
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
Controller('C1', link=self.link, public_address=addresses[0]),
Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
Device(
address=Address('F0:F1:F2:F3:F4:F5'),
address=Address(addresses[0]),
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address=Address('F5:F4:F3:F2:F1:F0'),
address=Address(addresses[1]),
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]
@@ -74,3 +80,10 @@ class TwoDevices:
def __getitem__(self, index: int) -> Device:
return self.devices[index]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready

View File

@@ -12,15 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import contextlib
import logging
import os
from bumble import utils
from pyee import EventEmitter
from unittest.mock import MagicMock
from pyee import EventEmitter
from bumble import utils
# -----------------------------------------------------------------------------
def test_on() -> None:
emitter = EventEmitter()
with contextlib.closing(utils.EventWatcher()) as context:
@@ -33,6 +38,7 @@ def test_on() -> None:
assert mock.call_count == 1
# -----------------------------------------------------------------------------
def test_on_decorator() -> None:
emitter = EventEmitter()
with contextlib.closing(utils.EventWatcher()) as context:
@@ -48,6 +54,7 @@ def test_on_decorator() -> None:
assert mock.call_count == 1
# -----------------------------------------------------------------------------
def test_multiple_handlers() -> None:
emitter = EventEmitter()
with contextlib.closing(utils.EventWatcher()) as context:
@@ -64,6 +71,30 @@ def test_multiple_handlers() -> None:
mock.assert_called_once_with('b')
# -----------------------------------------------------------------------------
def test_open_int_enums():
class Foo(utils.OpenIntEnum):
FOO = 1
BAR = 2
BLA = 3
x = Foo(1)
assert x.name == "FOO"
assert x.value == 1
assert int(x) == 1
assert x == 1
assert x + 1 == 2
x = Foo(4)
assert x.name == "Foo[4]"
assert x.value == 4
assert int(x) == 4
assert x == 4
assert x + 1 == 5
print(list(Foo))
# -----------------------------------------------------------------------------
def run_tests():
test_on()
@@ -75,3 +106,4 @@ def run_tests():
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
run_tests()
test_open_int_enums()