Compare commits

...

17 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
caa82b8f7e make cryptography a valid dependency for emscripten targets 2023-09-13 22:38:28 -07:00
Gilles Boccon-Gibod
5af347b499 Merge pull request #282 from google/gbg/multi-python-pre-commit-check
run pre-commit tests with all supported Python versions
2023-09-13 07:47:32 -07:00
zxzxwu
4ed5bb5a9e Merge pull request #281 from zxzxwu/cleanup-transport
Replace | typing usage with Optional and Union
2023-09-13 13:31:41 +08:00
Gilles Boccon-Gibod
2478d45673 more windows compat fixes 2023-09-12 14:52:42 -07:00
Gilles Boccon-Gibod
1bc7d94111 windows NamedTemporaryFile compatibility 2023-09-12 14:33:12 -07:00
Gilles Boccon-Gibod
6432414cd5 run tests on windows and mac in addition to linux 2023-09-12 13:50:15 -07:00
Gilles Boccon-Gibod
179064ba15 run pre-commit tests with all supported Python versions 2023-09-12 13:42:33 -07:00
William Escande
783b2d70a5 Add connection parameter update from peripheral 2023-09-12 11:08:04 -07:00
zxzxwu
80824f3fc1 Merge pull request #280 from zxzxwu/device_typing
Add terminated to TransportSource protocol
2023-09-12 20:46:35 +08:00
Josh Wu
f39f5f531c Replace | typing usage with Optional and Union 2023-09-12 15:50:51 +08:00
Gilles Boccon-Gibod
56139c622f Merge pull request #258 from mogenson/vsc_tx_power
Add support for Zephyr HCI VSC set TX power command
2023-09-11 21:34:11 -07:00
Michael Mogenson
da02f6a39b Add HCI Zephyr vendor commands to read and write TX power
Create platforms/zephyr/hci.py with definitions of vendor HCI commands
to read and write TX power.

Add documentation for how to prepare an nRF52840 dongle with a Zephyr
HCI USB firmware application that includes dynamic TX power support and
how to send a write TX power vendor HCI command from Bumble.
2023-09-11 10:06:10 -04:00
Josh Wu
548d5597c0 Transport: Add termination protocol signature 2023-09-11 14:36:40 +08:00
zxzxwu
7fd65d2412 Merge pull request #279 from zxzxwu/typo
Fix typo
2023-09-11 03:02:11 +08:00
Josh Wu
05a54a4af9 Fix typo 2023-09-10 20:32:58 +08:00
Gilles Boccon-Gibod
1e00c8f456 Merge pull request #276 from google/gbg/add-zephyr-zip-to-docs
add zephyr binary to docs
2023-09-08 18:07:15 -07:00
Gilles Boccon-Gibod
90d165aa01 add zephyr binary 2023-09-08 14:17:15 -07:00
23 changed files with 307 additions and 77 deletions

View File

@@ -14,6 +14,10 @@ jobs:
check: check:
name: Check Code name: Check Code
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
fail-fast: false
steps: steps:
- name: Check out from Git - name: Check out from Git

View File

@@ -12,10 +12,10 @@ permissions:
jobs: jobs:
build: build:
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
os: ['ubuntu-latest', 'macos-latest', 'windows-latest']
python-version: ["3.8", "3.9", "3.10", "3.11"] python-version: ["3.8", "3.9", "3.10", "3.11"]
fail-fast: false fail-fast: false
@@ -41,6 +41,7 @@ jobs:
run: | run: |
inv build inv build
inv build.mkdocs inv build.mkdocs
build-rust: build-rust:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:

View File

@@ -104,7 +104,7 @@ class SnoopPacketReader:
) )
@click.option( @click.option(
'--vendors', '--vendors',
type=click.Choice(['android']), type=click.Choice(['android', 'zephyr']),
multiple=True, multiple=True,
help='Support vendor-specific commands (list one or more)', help='Support vendor-specific commands (list one or more)',
) )
@@ -114,6 +114,8 @@ def main(format, vendors, filename):
for vendor in vendors: for vendor in vendors:
if vendor == 'android': if vendor == 'android':
import bumble.vendor.android.hci import bumble.vendor.android.hci
elif vendor == 'zephyr':
import bumble.vendor.zephyr.hci
input = open(filename, 'rb') input = open(filename, 'rb')
if format == 'h4': if format == 'h4':

View File

@@ -80,7 +80,7 @@ class BaseError(Exception):
def __init__( def __init__(
self, self,
error_code: int | None, error_code: Optional[int],
error_namespace: str = '', error_namespace: str = '',
error_name: str = '', error_name: str = '',
details: str = '', details: str = '',
@@ -142,6 +142,10 @@ class ConnectionError(BaseError): # pylint: disable=redefined-builtin
self.peer_address = peer_address self.peer_address = peer_address
class ConnectionParameterUpdateError(BaseError):
"""Connection Parameter Update Error"""
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# UUID # UUID
# #

View File

@@ -141,6 +141,7 @@ from .core import (
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
AdvertisingData, AdvertisingData,
ConnectionParameterUpdateError,
CommandTimeoutError, CommandTimeoutError,
ConnectionPHY, ConnectionPHY,
InvalidStateError, InvalidStateError,
@@ -723,6 +724,7 @@ class Connection(CompositeEventEmitter):
connection_interval_max, connection_interval_max,
max_latency, max_latency,
supervision_timeout, supervision_timeout,
use_l2cap=False,
): ):
return await self.device.update_connection_parameters( return await self.device.update_connection_parameters(
self, self,
@@ -730,6 +732,7 @@ class Connection(CompositeEventEmitter):
connection_interval_max, connection_interval_max,
max_latency, max_latency,
supervision_timeout, supervision_timeout,
use_l2cap=use_l2cap,
) )
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None): async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
@@ -2110,11 +2113,30 @@ class Device(CompositeEventEmitter):
supervision_timeout, supervision_timeout,
min_ce_length=0, min_ce_length=0,
max_ce_length=0, max_ce_length=0,
): use_l2cap=False,
) -> None:
''' '''
NOTE: the name of the parameters may look odd, but it just follows the names NOTE: the name of the parameters may look odd, but it just follows the names
used in the Bluetooth spec. used in the Bluetooth spec.
''' '''
if use_l2cap:
if connection.role != BT_PERIPHERAL_ROLE:
raise InvalidStateError(
'only peripheral can update connection parameters with l2cap'
)
l2cap_result = (
await self.l2cap_channel_manager.update_connection_parameters(
connection,
connection_interval_min,
connection_interval_max,
max_latency,
supervision_timeout,
)
)
if l2cap_result != l2cap.L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT:
raise ConnectionParameterUpdateError(l2cap_result)
result = await self.send_command( result = await self.send_command(
HCI_LE_Connection_Update_Command( HCI_LE_Connection_Update_Command(
connection_handle=connection.handle, connection_handle=connection.handle,
@@ -2124,7 +2146,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=supervision_timeout, supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length, min_ce_length=min_ce_length,
max_ce_length=max_ce_length, max_ce_length=max_ce_length,
) ) # type: ignore[call-arg]
) )
if result.status != HCI_Command_Status_Event.PENDING: if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result) raise HCI_StatusError(result)

View File

@@ -4397,7 +4397,7 @@ class HCI_Event(HCI_Packet):
if len(parameters) != length: if len(parameters) != length:
raise ValueError('invalid packet length') raise ValueError('invalid packet length')
cls: Type[HCI_Event | HCI_LE_Meta_Event] | None cls: Any
if event_code == HCI_LE_META_EVENT: if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call # We do this dispatch here and not in the subclass in order to avoid call
# loops # loops

View File

@@ -757,7 +757,7 @@ class Channel(EventEmitter):
) )
self.state = new_state self.state = new_state
def send_pdu(self, pdu: SupportsBytes | bytes) -> None: def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
self.manager.send_pdu(self.connection, self.destination_cid, pdu) self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
@@ -1098,7 +1098,7 @@ class LeConnectionOrientedChannel(EventEmitter):
elif new_state == self.DISCONNECTED: elif new_state == self.DISCONNECTED:
self.emit('close') self.emit('close')
def send_pdu(self, pdu: SupportsBytes | bytes) -> None: def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
self.manager.send_pdu(self.connection, self.destination_cid, pdu) self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
@@ -1387,6 +1387,7 @@ class ChannelManager:
le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request] le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]] fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
_host: Optional[Host] _host: Optional[Host]
connection_parameters_update_response: Optional[asyncio.Future[int]]
def __init__( def __init__(
self, self,
@@ -1408,6 +1409,7 @@ class ChannelManager:
self.le_coc_requests = {} # LE CoC connection requests, by identifier self.le_coc_requests = {} # LE CoC connection requests, by identifier
self.extended_features = extended_features self.extended_features = extended_features
self.connectionless_mtu = connectionless_mtu self.connectionless_mtu = connectionless_mtu
self.connection_parameters_update_response = None
@property @property
def host(self) -> Host: def host(self) -> Host:
@@ -1569,7 +1571,7 @@ class ChannelManager:
if connection_handle in self.identifiers: if connection_handle in self.identifiers:
del self.identifiers[connection_handle] del self.identifiers[connection_handle]
def send_pdu(self, connection, cid: int, pdu: SupportsBytes | bytes) -> None: def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu) pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
logger.debug( logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} ' f'{color(">>> Sending L2CAP PDU", "blue")} '
@@ -1865,11 +1867,45 @@ class ChannelManager:
), ),
) )
async def update_connection_parameters(
self,
connection: Connection,
interval_min: int,
interval_max: int,
latency: int,
timeout: int,
) -> int:
# Check that there isn't already a request pending
if self.connection_parameters_update_response:
raise InvalidStateError('request already pending')
self.connection_parameters_update_response = (
asyncio.get_running_loop().create_future()
)
self.send_control_frame(
connection,
L2CAP_LE_SIGNALING_CID,
L2CAP_Connection_Parameter_Update_Request(
interval_min=interval_min,
interval_max=interval_max,
latency=latency,
timeout=timeout,
),
)
return await self.connection_parameters_update_response
def on_l2cap_connection_parameter_update_response( def on_l2cap_connection_parameter_update_response(
self, connection: Connection, cid: int, response self, connection: Connection, cid: int, response
) -> None: ) -> None:
# TODO: check response if self.connection_parameters_update_response:
pass self.connection_parameters_update_response.set_result(response.result)
self.connection_parameters_update_response = None
else:
logger.warning(
color(
'received l2cap_connection_parameter_update_response without a pending request',
'red',
)
)
def on_l2cap_le_credit_based_connection_request( def on_l2cap_le_credit_based_connection_request(
self, connection: Connection, cid: int, request self, connection: Connection, cid: int, request

View File

@@ -18,6 +18,8 @@
import logging import logging
import grpc.aio import grpc.aio
from typing import Optional, Union
from .common import PumpedTransport, PumpedPacketSource, PumpedPacketSink, Transport from .common import PumpedTransport, PumpedPacketSource, PumpedPacketSink, Transport
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
@@ -33,7 +35,7 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def open_android_emulator_transport(spec: str | None) -> Transport: async def open_android_emulator_transport(spec: Optional[str]) -> Transport:
''' '''
Open a transport connection to an Android emulator via its gRPC interface. Open a transport connection to an Android emulator via its gRPC interface.
The parameter string has this syntax: The parameter string has this syntax:
@@ -82,7 +84,7 @@ async def open_android_emulator_transport(spec: str | None) -> Transport:
logger.debug(f'connecting to gRPC server at {server_address}') logger.debug(f'connecting to gRPC server at {server_address}')
channel = grpc.aio.insecure_channel(server_address) channel = grpc.aio.insecure_channel(server_address)
service: EmulatedBluetoothServiceStub | VhciForwardingServiceStub service: Union[EmulatedBluetoothServiceStub, VhciForwardingServiceStub]
if mode == 'host': if mode == 'host':
# Connect as a host # Connect as a host
service = EmulatedBluetoothServiceStub(channel) service = EmulatedBluetoothServiceStub(channel)

View File

@@ -122,7 +122,7 @@ def publish_grpc_port(grpc_port) -> bool:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def open_android_netsim_controller_transport( async def open_android_netsim_controller_transport(
server_host: str | None, server_port: int server_host: Optional[str], server_port: int
) -> Transport: ) -> Transport:
if not server_port: if not server_port:
raise ValueError('invalid port') raise ValueError('invalid port')

View File

@@ -63,6 +63,8 @@ class TransportSink(Protocol):
class TransportSource(Protocol): class TransportSource(Protocol):
terminated: asyncio.Future[None]
def set_packet_sink(self, sink: TransportSink) -> None: def set_packet_sink(self, sink: TransportSink) -> None:
... ...
@@ -430,6 +432,7 @@ class SnoopingTransport(Transport):
def __init__(self, source: TransportSource, snooper: Snooper): def __init__(self, source: TransportSource, snooper: Snooper):
self.source = source self.source = source
self.snooper = snooper self.snooper = snooper
self.terminated = source.terminated
def set_packet_sink(self, sink: TransportSink) -> None: def set_packet_sink(self, sink: TransportSink) -> None:
self.sink = sink self.sink = sink

View File

@@ -23,6 +23,8 @@ import socket
import ctypes import ctypes
import collections import collections
from typing import Optional
from .common import Transport, ParserSource from .common import Transport, ParserSource
@@ -33,7 +35,7 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def open_hci_socket_transport(spec: str | None) -> Transport: async def open_hci_socket_transport(spec: Optional[str]) -> Transport:
''' '''
Open an HCI Socket (only available on some platforms). Open an HCI Socket (only available on some platforms).
The parameter string is either empty (to use the first/default Bluetooth adapter) The parameter string is either empty (to use the first/default Bluetooth adapter)

View File

@@ -23,6 +23,8 @@ import atexit
import os import os
import logging import logging
from typing import Optional
from .common import Transport, StreamPacketSource, StreamPacketSink from .common import Transport, StreamPacketSource, StreamPacketSink
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -32,7 +34,7 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def open_pty_transport(spec: str | None) -> Transport: async def open_pty_transport(spec: Optional[str]) -> Transport:
''' '''
Open a PTY transport. Open a PTY transport.
The parameter string may be empty, or a path name where a symbolic link The parameter string may be empty, or a path name where a symbolic link

View File

@@ -17,6 +17,8 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import logging import logging
from typing import Optional
from .common import Transport from .common import Transport
from .file import open_file_transport from .file import open_file_transport
@@ -27,7 +29,7 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def open_vhci_transport(spec: str | None) -> Transport: async def open_vhci_transport(spec: Optional[str]) -> Transport:
''' '''
Open a VHCI transport (only available on some platforms). Open a VHCI transport (only available on some platforms).
The parameter string is either empty (to use the default VHCI device The parameter string is either empty (to use the default VHCI device

0
bumble/vendor/zephyr/__init__.py vendored Normal file
View File

88
bumble/vendor/zephyr/hci.py vendored Normal file
View File

@@ -0,0 +1,88 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from bumble.hci import (
hci_vendor_command_op_code,
HCI_Command,
STATUS_SPEC,
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# Zephyr RTOS Vendor Specific Commands and Events.
# Only a subset of the commands are implemented here currently.
#
# pylint: disable-next=line-too-long
# See https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
HCI_WRITE_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000E)
HCI_READ_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000F)
HCI_Command.register_commands(globals())
# -----------------------------------------------------------------------------
class TX_Power_Level_Command:
'''
Base class for read and write TX power level HCI commands
'''
TX_POWER_HANDLE_TYPE_ADV = 0x00
TX_POWER_HANDLE_TYPE_SCAN = 0x01
TX_POWER_HANDLE_TYPE_CONN = 0x02
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('handle_type', 1), ('connection_handle', 2), ('tx_power_level', -1)],
return_parameters_fields=[
('status', STATUS_SPEC),
('handle_type', 1),
('connection_handle', 2),
('selected_tx_power_level', -1),
],
)
class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
'''
Write TX power level. See BT_HCI_OP_VS_WRITE_TX_POWER_LEVEL in
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
Power level is in dB. Connection handle for TX_POWER_HANDLE_TYPE_ADV and
TX_POWER_HANDLE_TYPE_SCAN should be zero.
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('handle_type', 1), ('connection_handle', 2)],
return_parameters_fields=[
('status', STATUS_SPEC),
('handle_type', 1),
('connection_handle', 2),
('tx_power_level', -1),
],
)
class HCI_Read_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
'''
Read TX power level. See BT_HCI_OP_VS_READ_TX_POWER_LEVEL in
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
Power level is in dB. Connection handle for TX_POWER_HANDLE_TYPE_ADV and
TX_POWER_HANDLE_TYPE_SCAN should be zero.
'''

View File

@@ -64,6 +64,7 @@ nav:
- Linux: platforms/linux.md - Linux: platforms/linux.md
- Windows: platforms/windows.md - Windows: platforms/windows.md
- Android: platforms/android.md - Android: platforms/android.md
- Zephyr: platforms/zephyr.md
- Examples: - Examples:
- Overview: examples/index.md - Overview: examples/index.md

Binary file not shown.

View File

@@ -9,3 +9,4 @@ For platform-specific information, see the following pages:
* :material-linux: Linux - see the [Linux platform page](linux.md) * :material-linux: Linux - see the [Linux platform page](linux.md)
* :material-microsoft-windows: Windows - see the [Windows platform page](windows.md) * :material-microsoft-windows: Windows - see the [Windows platform page](windows.md)
* :material-android: Android - see the [Android platform page](android.md) * :material-android: Android - see the [Android platform page](android.md)
* :material-memory: Zephyr - see the [Zephyr platform page](zephyr.md)

View File

@@ -0,0 +1,51 @@
:material-memory: ZEPHYR PLATFORM
=================================
Set TX Power on nRF52840
------------------------
The Nordic nRF52840 supports Zephyr's vendor specific HCI command for setting TX
power during advertising, connection, or scanning. With the example [HCI
USB](https://docs.zephyrproject.org/latest/samples/bluetooth/hci_usb/README.html)
application, an [nRF52840
dongle](https://www.nordicsemi.com/Products/Development-
hardware/nRF52840-Dongle) can be used as a Bumble controller.
To add dynamic TX power support to the HCI USB application, add the following to
`zephyr/samples/bluetooth/hci_usb/prj.conf` and build.
```
CONFIG_BT_CTLR_ADVANCED_FEATURES=y
CONFIG_BT_CTLR_CONN_RSSI=y
CONFIG_BT_CTLR_TX_PWR_DYNAMIC_CONTROL=y
```
Alternatively, a prebuilt firmware application can be downloaded here:
[hci_usb.zip](../downloads/zephyr/hci_usb.zip).
Put the nRF52840 dongle into bootloader mode by pressing the RESET button. The
LED should pulse red. Load the firmware application with the `nrfutil` tool:
```
nrfutil dfu usb-serial -pkg hci_usb.zip -p /dev/ttyACM0
```
The vendor specific HCI commands to read and write TX power are defined in
`bumble/vendor/zephyr/hci.py` and may be used as such:
```python
from bumble.vendor.zephyr.hci import HCI_Write_Tx_Power_Level_Command
# set advertising power to -4 dB
response = await host.send_command(
HCI_Write_Tx_Power_Level_Command(
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
connection_handle=0,
tx_power_level=-4,
)
)
if response.return_parameters.status == HCI_SUCCESS:
print(f"TX power set to {response.return_parameters.selected_tx_power_level}")
```

View File

@@ -35,7 +35,7 @@ install_requires =
appdirs >= 1.4; platform_system!='Emscripten' appdirs >= 1.4; platform_system!='Emscripten'
bt-test-interfaces >= 0.0.2; platform_system!='Emscripten' bt-test-interfaces >= 0.0.2; platform_system!='Emscripten'
click == 8.1.3; platform_system!='Emscripten' click == 8.1.3; platform_system!='Emscripten'
cryptography == 39; platform_system!='Emscripten' cryptography == 39
grpcio == 1.57.0; platform_system!='Emscripten' grpcio == 1.57.0; platform_system!='Emscripten'
humanize >= 4.6.0; platform_system!='Emscripten' humanize >= 4.6.0; platform_system!='Emscripten'
libusb1 >= 2.0.1; platform_system!='Emscripten' libusb1 >= 2.0.1; platform_system!='Emscripten'

View File

@@ -53,7 +53,7 @@ async def make_hfp_connections(
client_dlc = await client_mux.open_dlc(rfcomm_channel) client_dlc = await client_mux.open_dlc(rfcomm_channel)
server_dlc = await wait_dlc server_dlc = await wait_dlc
# Setup HFP connnection # Setup HFP connection
hf = hfp.HfProtocol(client_dlc, hf_config) hf = hfp.HfProtocol(client_dlc, hf_config)
ag = hfp.HfpProtocol(server_dlc) ag = hfp.HfpProtocol(server_dlc)
return hf, ag return hf, ag

View File

@@ -18,6 +18,8 @@
import asyncio import asyncio
import json import json
import logging import logging
import pathlib
import pytest
import tempfile import tempfile
import os import os
@@ -83,87 +85,95 @@ JSON3 = """
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def test_basic(): @pytest.fixture
with tempfile.NamedTemporaryFile(mode="r+", encoding='utf-8') as file: def temporary_file():
keystore = JsonKeyStore('my_namespace', file.name) file = tempfile.NamedTemporaryFile(delete=False)
file.close()
yield file.name
pathlib.Path(file.name).unlink()
# -----------------------------------------------------------------------------
async def test_basic(temporary_file):
with open(temporary_file, mode='w', encoding='utf-8') as file:
file.write("{}") file.write("{}")
file.flush() file.flush()
keys = await keystore.get_all() keystore = JsonKeyStore('my_namespace', temporary_file)
assert len(keys) == 0
keys = PairingKeys() keys = await keystore.get_all()
await keystore.update('foo', keys) assert len(keys) == 0
foo = await keystore.get('foo')
assert foo is not None
assert foo.ltk is None
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
keys.ltk = PairingKeys.Key(ltk)
await keystore.update('foo', keys)
foo = await keystore.get('foo')
assert foo is not None
assert foo.ltk is not None
assert foo.ltk.value == ltk
file.flush() keys = PairingKeys()
with open(file.name, "r", encoding="utf-8") as json_file: await keystore.update('foo', keys)
json_data = json.load(json_file) foo = await keystore.get('foo')
assert 'my_namespace' in json_data assert foo is not None
assert 'foo' in json_data['my_namespace'] assert foo.ltk is None
assert 'ltk' in json_data['my_namespace']['foo'] ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
keys.ltk = PairingKeys.Key(ltk)
await keystore.update('foo', keys)
foo = await keystore.get('foo')
assert foo is not None
assert foo.ltk is not None
assert foo.ltk.value == ltk
with open(file.name, "r", encoding="utf-8") as json_file:
json_data = json.load(json_file)
assert 'my_namespace' in json_data
assert 'foo' in json_data['my_namespace']
assert 'ltk' in json_data['my_namespace']['foo']
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def test_parsing(): async def test_parsing(temporary_file):
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: with open(temporary_file, mode='w', encoding='utf-8') as file:
keystore = JsonKeyStore('my_namespace', file.name)
file.write(JSON1) file.write(JSON1)
file.flush() file.flush()
foo = await keystore.get('14:7D:DA:4E:53:A8/P') keystore = JsonKeyStore('my_namespace', file.name)
assert foo is not None foo = await keystore.get('14:7D:DA:4E:53:A8/P')
assert foo.ltk.value == bytes.fromhex('d1897ee10016eb1a08e4e037fd54c683') assert foo is not None
assert foo.ltk.value == bytes.fromhex('d1897ee10016eb1a08e4e037fd54c683')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def test_default_namespace(): async def test_default_namespace(temporary_file):
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: with open(temporary_file, mode='w', encoding='utf-8') as file:
keystore = JsonKeyStore(None, file.name)
file.write(JSON1) file.write(JSON1)
file.flush() file.flush()
all_keys = await keystore.get_all() keystore = JsonKeyStore(None, file.name)
assert len(all_keys) == 1 all_keys = await keystore.get_all()
name, keys = all_keys[0] assert len(all_keys) == 1
assert name == '14:7D:DA:4E:53:A8/P' name, keys = all_keys[0]
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') assert name == '14:7D:DA:4E:53:A8/P'
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: with open(temporary_file, mode='w', encoding='utf-8') as file:
keystore = JsonKeyStore(None, file.name)
file.write(JSON2) file.write(JSON2)
file.flush() file.flush()
keys = PairingKeys() keystore = JsonKeyStore(None, file.name)
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) keys = PairingKeys()
keys.ltk = PairingKeys.Key(ltk) ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
await keystore.update('foo', keys) keys.ltk = PairingKeys.Key(ltk)
file.flush() await keystore.update('foo', keys)
with open(file.name, "r", encoding="utf-8") as json_file: with open(file.name, "r", encoding="utf-8") as json_file:
json_data = json.load(json_file) json_data = json.load(json_file)
assert '__DEFAULT__' in json_data assert '__DEFAULT__' in json_data
assert 'foo' in json_data['__DEFAULT__'] assert 'foo' in json_data['__DEFAULT__']
assert 'ltk' in json_data['__DEFAULT__']['foo'] assert 'ltk' in json_data['__DEFAULT__']['foo']
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: with open(temporary_file, mode='w', encoding='utf-8') as file:
keystore = JsonKeyStore(None, file.name)
file.write(JSON3) file.write(JSON3)
file.flush() file.flush()
all_keys = await keystore.get_all() keystore = JsonKeyStore(None, file.name)
assert len(all_keys) == 1 all_keys = await keystore.get_all()
name, keys = all_keys[0] assert len(all_keys) == 1
assert name == '14:7D:DA:4E:53:A8/P' name, keys = all_keys[0]
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') assert name == '14:7D:DA:4E:53:A8/P'
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -74,7 +74,6 @@ export async function loadBumble(pyodide, bumblePackage) {
await pyodide.loadPackage("micropip"); await pyodide.loadPackage("micropip");
await pyodide.runPythonAsync(` await pyodide.runPythonAsync(`
import micropip import micropip
await micropip.install("cryptography")
await micropip.install("${bumblePackage}") await micropip.install("${bumblePackage}")
package_list = micropip.list() package_list = micropip.list()
print(package_list) print(package_list)