forked from auracaster/bumble_mirror
Compare commits
13 Commits
v0.0.167
...
gbg/multi-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2478d45673 | ||
|
|
1bc7d94111 | ||
|
|
6432414cd5 | ||
|
|
179064ba15 | ||
|
|
783b2d70a5 | ||
|
|
80824f3fc1 | ||
|
|
56139c622f | ||
|
|
da02f6a39b | ||
|
|
548d5597c0 | ||
|
|
7fd65d2412 | ||
|
|
05a54a4af9 | ||
|
|
1e00c8f456 | ||
|
|
90d165aa01 |
4
.github/workflows/code-check.yml
vendored
4
.github/workflows/code-check.yml
vendored
@@ -14,6 +14,10 @@ jobs:
|
|||||||
check:
|
check:
|
||||||
name: Check Code
|
name: Check Code
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
python-version: ["3.8", "3.9", "3.10", "3.11"]
|
||||||
|
fail-fast: false
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Check out from Git
|
- name: Check out from Git
|
||||||
|
|||||||
5
.github/workflows/python-build-test.yml
vendored
5
.github/workflows/python-build-test.yml
vendored
@@ -12,10 +12,10 @@ permissions:
|
|||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
runs-on: ubuntu-latest
|
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
|
os: ['ubuntu-latest', 'macos-latest', 'windows-latest']
|
||||||
python-version: ["3.8", "3.9", "3.10", "3.11"]
|
python-version: ["3.8", "3.9", "3.10", "3.11"]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
|
|
||||||
@@ -41,6 +41,7 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
inv build
|
inv build
|
||||||
inv build.mkdocs
|
inv build.mkdocs
|
||||||
|
|
||||||
build-rust:
|
build-rust:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ class SnoopPacketReader:
|
|||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
'--vendors',
|
'--vendors',
|
||||||
type=click.Choice(['android']),
|
type=click.Choice(['android', 'zephyr']),
|
||||||
multiple=True,
|
multiple=True,
|
||||||
help='Support vendor-specific commands (list one or more)',
|
help='Support vendor-specific commands (list one or more)',
|
||||||
)
|
)
|
||||||
@@ -114,6 +114,8 @@ def main(format, vendors, filename):
|
|||||||
for vendor in vendors:
|
for vendor in vendors:
|
||||||
if vendor == 'android':
|
if vendor == 'android':
|
||||||
import bumble.vendor.android.hci
|
import bumble.vendor.android.hci
|
||||||
|
elif vendor == 'zephyr':
|
||||||
|
import bumble.vendor.zephyr.hci
|
||||||
|
|
||||||
input = open(filename, 'rb')
|
input = open(filename, 'rb')
|
||||||
if format == 'h4':
|
if format == 'h4':
|
||||||
|
|||||||
@@ -142,6 +142,10 @@ class ConnectionError(BaseError): # pylint: disable=redefined-builtin
|
|||||||
self.peer_address = peer_address
|
self.peer_address = peer_address
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionParameterUpdateError(BaseError):
|
||||||
|
"""Connection Parameter Update Error"""
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# UUID
|
# UUID
|
||||||
#
|
#
|
||||||
|
|||||||
@@ -141,6 +141,7 @@ from .core import (
|
|||||||
BT_LE_TRANSPORT,
|
BT_LE_TRANSPORT,
|
||||||
BT_PERIPHERAL_ROLE,
|
BT_PERIPHERAL_ROLE,
|
||||||
AdvertisingData,
|
AdvertisingData,
|
||||||
|
ConnectionParameterUpdateError,
|
||||||
CommandTimeoutError,
|
CommandTimeoutError,
|
||||||
ConnectionPHY,
|
ConnectionPHY,
|
||||||
InvalidStateError,
|
InvalidStateError,
|
||||||
@@ -723,6 +724,7 @@ class Connection(CompositeEventEmitter):
|
|||||||
connection_interval_max,
|
connection_interval_max,
|
||||||
max_latency,
|
max_latency,
|
||||||
supervision_timeout,
|
supervision_timeout,
|
||||||
|
use_l2cap=False,
|
||||||
):
|
):
|
||||||
return await self.device.update_connection_parameters(
|
return await self.device.update_connection_parameters(
|
||||||
self,
|
self,
|
||||||
@@ -730,6 +732,7 @@ class Connection(CompositeEventEmitter):
|
|||||||
connection_interval_max,
|
connection_interval_max,
|
||||||
max_latency,
|
max_latency,
|
||||||
supervision_timeout,
|
supervision_timeout,
|
||||||
|
use_l2cap=use_l2cap,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
|
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
|
||||||
@@ -2110,11 +2113,30 @@ class Device(CompositeEventEmitter):
|
|||||||
supervision_timeout,
|
supervision_timeout,
|
||||||
min_ce_length=0,
|
min_ce_length=0,
|
||||||
max_ce_length=0,
|
max_ce_length=0,
|
||||||
):
|
use_l2cap=False,
|
||||||
|
) -> None:
|
||||||
'''
|
'''
|
||||||
NOTE: the name of the parameters may look odd, but it just follows the names
|
NOTE: the name of the parameters may look odd, but it just follows the names
|
||||||
used in the Bluetooth spec.
|
used in the Bluetooth spec.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
if use_l2cap:
|
||||||
|
if connection.role != BT_PERIPHERAL_ROLE:
|
||||||
|
raise InvalidStateError(
|
||||||
|
'only peripheral can update connection parameters with l2cap'
|
||||||
|
)
|
||||||
|
l2cap_result = (
|
||||||
|
await self.l2cap_channel_manager.update_connection_parameters(
|
||||||
|
connection,
|
||||||
|
connection_interval_min,
|
||||||
|
connection_interval_max,
|
||||||
|
max_latency,
|
||||||
|
supervision_timeout,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if l2cap_result != l2cap.L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT:
|
||||||
|
raise ConnectionParameterUpdateError(l2cap_result)
|
||||||
|
|
||||||
result = await self.send_command(
|
result = await self.send_command(
|
||||||
HCI_LE_Connection_Update_Command(
|
HCI_LE_Connection_Update_Command(
|
||||||
connection_handle=connection.handle,
|
connection_handle=connection.handle,
|
||||||
@@ -2124,7 +2146,7 @@ class Device(CompositeEventEmitter):
|
|||||||
supervision_timeout=supervision_timeout,
|
supervision_timeout=supervision_timeout,
|
||||||
min_ce_length=min_ce_length,
|
min_ce_length=min_ce_length,
|
||||||
max_ce_length=max_ce_length,
|
max_ce_length=max_ce_length,
|
||||||
)
|
) # type: ignore[call-arg]
|
||||||
)
|
)
|
||||||
if result.status != HCI_Command_Status_Event.PENDING:
|
if result.status != HCI_Command_Status_Event.PENDING:
|
||||||
raise HCI_StatusError(result)
|
raise HCI_StatusError(result)
|
||||||
|
|||||||
@@ -1387,6 +1387,7 @@ class ChannelManager:
|
|||||||
le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
|
le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
|
||||||
fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
|
fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
|
||||||
_host: Optional[Host]
|
_host: Optional[Host]
|
||||||
|
connection_parameters_update_response: Optional[asyncio.Future[int]]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -1408,6 +1409,7 @@ class ChannelManager:
|
|||||||
self.le_coc_requests = {} # LE CoC connection requests, by identifier
|
self.le_coc_requests = {} # LE CoC connection requests, by identifier
|
||||||
self.extended_features = extended_features
|
self.extended_features = extended_features
|
||||||
self.connectionless_mtu = connectionless_mtu
|
self.connectionless_mtu = connectionless_mtu
|
||||||
|
self.connection_parameters_update_response = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def host(self) -> Host:
|
def host(self) -> Host:
|
||||||
@@ -1865,11 +1867,45 @@ class ChannelManager:
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def update_connection_parameters(
|
||||||
|
self,
|
||||||
|
connection: Connection,
|
||||||
|
interval_min: int,
|
||||||
|
interval_max: int,
|
||||||
|
latency: int,
|
||||||
|
timeout: int,
|
||||||
|
) -> int:
|
||||||
|
# Check that there isn't already a request pending
|
||||||
|
if self.connection_parameters_update_response:
|
||||||
|
raise InvalidStateError('request already pending')
|
||||||
|
self.connection_parameters_update_response = (
|
||||||
|
asyncio.get_running_loop().create_future()
|
||||||
|
)
|
||||||
|
self.send_control_frame(
|
||||||
|
connection,
|
||||||
|
L2CAP_LE_SIGNALING_CID,
|
||||||
|
L2CAP_Connection_Parameter_Update_Request(
|
||||||
|
interval_min=interval_min,
|
||||||
|
interval_max=interval_max,
|
||||||
|
latency=latency,
|
||||||
|
timeout=timeout,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return await self.connection_parameters_update_response
|
||||||
|
|
||||||
def on_l2cap_connection_parameter_update_response(
|
def on_l2cap_connection_parameter_update_response(
|
||||||
self, connection: Connection, cid: int, response
|
self, connection: Connection, cid: int, response
|
||||||
) -> None:
|
) -> None:
|
||||||
# TODO: check response
|
if self.connection_parameters_update_response:
|
||||||
pass
|
self.connection_parameters_update_response.set_result(response.result)
|
||||||
|
self.connection_parameters_update_response = None
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
color(
|
||||||
|
'received l2cap_connection_parameter_update_response without a pending request',
|
||||||
|
'red',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def on_l2cap_le_credit_based_connection_request(
|
def on_l2cap_le_credit_based_connection_request(
|
||||||
self, connection: Connection, cid: int, request
|
self, connection: Connection, cid: int, request
|
||||||
|
|||||||
@@ -63,6 +63,8 @@ class TransportSink(Protocol):
|
|||||||
|
|
||||||
|
|
||||||
class TransportSource(Protocol):
|
class TransportSource(Protocol):
|
||||||
|
terminated: asyncio.Future[None]
|
||||||
|
|
||||||
def set_packet_sink(self, sink: TransportSink) -> None:
|
def set_packet_sink(self, sink: TransportSink) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
@@ -430,6 +432,7 @@ class SnoopingTransport(Transport):
|
|||||||
def __init__(self, source: TransportSource, snooper: Snooper):
|
def __init__(self, source: TransportSource, snooper: Snooper):
|
||||||
self.source = source
|
self.source = source
|
||||||
self.snooper = snooper
|
self.snooper = snooper
|
||||||
|
self.terminated = source.terminated
|
||||||
|
|
||||||
def set_packet_sink(self, sink: TransportSink) -> None:
|
def set_packet_sink(self, sink: TransportSink) -> None:
|
||||||
self.sink = sink
|
self.sink = sink
|
||||||
|
|||||||
0
bumble/vendor/zephyr/__init__.py
vendored
Normal file
0
bumble/vendor/zephyr/__init__.py
vendored
Normal file
88
bumble/vendor/zephyr/hci.py
vendored
Normal file
88
bumble/vendor/zephyr/hci.py
vendored
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
# Copyright 2021-2023 Google LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Imports
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
from bumble.hci import (
|
||||||
|
hci_vendor_command_op_code,
|
||||||
|
HCI_Command,
|
||||||
|
STATUS_SPEC,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Constants
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Zephyr RTOS Vendor Specific Commands and Events.
|
||||||
|
# Only a subset of the commands are implemented here currently.
|
||||||
|
#
|
||||||
|
# pylint: disable-next=line-too-long
|
||||||
|
# See https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
|
||||||
|
HCI_WRITE_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000E)
|
||||||
|
HCI_READ_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000F)
|
||||||
|
|
||||||
|
HCI_Command.register_commands(globals())
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class TX_Power_Level_Command:
|
||||||
|
'''
|
||||||
|
Base class for read and write TX power level HCI commands
|
||||||
|
'''
|
||||||
|
|
||||||
|
TX_POWER_HANDLE_TYPE_ADV = 0x00
|
||||||
|
TX_POWER_HANDLE_TYPE_SCAN = 0x01
|
||||||
|
TX_POWER_HANDLE_TYPE_CONN = 0x02
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@HCI_Command.command(
|
||||||
|
fields=[('handle_type', 1), ('connection_handle', 2), ('tx_power_level', -1)],
|
||||||
|
return_parameters_fields=[
|
||||||
|
('status', STATUS_SPEC),
|
||||||
|
('handle_type', 1),
|
||||||
|
('connection_handle', 2),
|
||||||
|
('selected_tx_power_level', -1),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
|
||||||
|
'''
|
||||||
|
Write TX power level. See BT_HCI_OP_VS_WRITE_TX_POWER_LEVEL in
|
||||||
|
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
|
||||||
|
|
||||||
|
Power level is in dB. Connection handle for TX_POWER_HANDLE_TYPE_ADV and
|
||||||
|
TX_POWER_HANDLE_TYPE_SCAN should be zero.
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@HCI_Command.command(
|
||||||
|
fields=[('handle_type', 1), ('connection_handle', 2)],
|
||||||
|
return_parameters_fields=[
|
||||||
|
('status', STATUS_SPEC),
|
||||||
|
('handle_type', 1),
|
||||||
|
('connection_handle', 2),
|
||||||
|
('tx_power_level', -1),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
class HCI_Read_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
|
||||||
|
'''
|
||||||
|
Read TX power level. See BT_HCI_OP_VS_READ_TX_POWER_LEVEL in
|
||||||
|
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
|
||||||
|
|
||||||
|
Power level is in dB. Connection handle for TX_POWER_HANDLE_TYPE_ADV and
|
||||||
|
TX_POWER_HANDLE_TYPE_SCAN should be zero.
|
||||||
|
'''
|
||||||
@@ -64,6 +64,7 @@ nav:
|
|||||||
- Linux: platforms/linux.md
|
- Linux: platforms/linux.md
|
||||||
- Windows: platforms/windows.md
|
- Windows: platforms/windows.md
|
||||||
- Android: platforms/android.md
|
- Android: platforms/android.md
|
||||||
|
- Zephyr: platforms/zephyr.md
|
||||||
- Examples:
|
- Examples:
|
||||||
- Overview: examples/index.md
|
- Overview: examples/index.md
|
||||||
|
|
||||||
|
|||||||
BIN
docs/mkdocs/src/downloads/zephyr/hci_usb.zip
Normal file
BIN
docs/mkdocs/src/downloads/zephyr/hci_usb.zip
Normal file
Binary file not shown.
@@ -9,3 +9,4 @@ For platform-specific information, see the following pages:
|
|||||||
* :material-linux: Linux - see the [Linux platform page](linux.md)
|
* :material-linux: Linux - see the [Linux platform page](linux.md)
|
||||||
* :material-microsoft-windows: Windows - see the [Windows platform page](windows.md)
|
* :material-microsoft-windows: Windows - see the [Windows platform page](windows.md)
|
||||||
* :material-android: Android - see the [Android platform page](android.md)
|
* :material-android: Android - see the [Android platform page](android.md)
|
||||||
|
* :material-memory: Zephyr - see the [Zephyr platform page](zephyr.md)
|
||||||
|
|||||||
51
docs/mkdocs/src/platforms/zephyr.md
Normal file
51
docs/mkdocs/src/platforms/zephyr.md
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
:material-memory: ZEPHYR PLATFORM
|
||||||
|
=================================
|
||||||
|
|
||||||
|
Set TX Power on nRF52840
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
The Nordic nRF52840 supports Zephyr's vendor specific HCI command for setting TX
|
||||||
|
power during advertising, connection, or scanning. With the example [HCI
|
||||||
|
USB](https://docs.zephyrproject.org/latest/samples/bluetooth/hci_usb/README.html)
|
||||||
|
application, an [nRF52840
|
||||||
|
dongle](https://www.nordicsemi.com/Products/Development-
|
||||||
|
hardware/nRF52840-Dongle) can be used as a Bumble controller.
|
||||||
|
|
||||||
|
To add dynamic TX power support to the HCI USB application, add the following to
|
||||||
|
`zephyr/samples/bluetooth/hci_usb/prj.conf` and build.
|
||||||
|
|
||||||
|
```
|
||||||
|
CONFIG_BT_CTLR_ADVANCED_FEATURES=y
|
||||||
|
CONFIG_BT_CTLR_CONN_RSSI=y
|
||||||
|
CONFIG_BT_CTLR_TX_PWR_DYNAMIC_CONTROL=y
|
||||||
|
```
|
||||||
|
|
||||||
|
Alternatively, a prebuilt firmware application can be downloaded here:
|
||||||
|
[hci_usb.zip](../downloads/zephyr/hci_usb.zip).
|
||||||
|
|
||||||
|
Put the nRF52840 dongle into bootloader mode by pressing the RESET button. The
|
||||||
|
LED should pulse red. Load the firmware application with the `nrfutil` tool:
|
||||||
|
|
||||||
|
```
|
||||||
|
nrfutil dfu usb-serial -pkg hci_usb.zip -p /dev/ttyACM0
|
||||||
|
```
|
||||||
|
|
||||||
|
The vendor specific HCI commands to read and write TX power are defined in
|
||||||
|
`bumble/vendor/zephyr/hci.py` and may be used as such:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from bumble.vendor.zephyr.hci import HCI_Write_Tx_Power_Level_Command
|
||||||
|
|
||||||
|
# set advertising power to -4 dB
|
||||||
|
response = await host.send_command(
|
||||||
|
HCI_Write_Tx_Power_Level_Command(
|
||||||
|
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
|
||||||
|
connection_handle=0,
|
||||||
|
tx_power_level=-4,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.return_parameters.status == HCI_SUCCESS:
|
||||||
|
print(f"TX power set to {response.return_parameters.selected_tx_power_level}")
|
||||||
|
|
||||||
|
```
|
||||||
@@ -53,7 +53,7 @@ async def make_hfp_connections(
|
|||||||
client_dlc = await client_mux.open_dlc(rfcomm_channel)
|
client_dlc = await client_mux.open_dlc(rfcomm_channel)
|
||||||
server_dlc = await wait_dlc
|
server_dlc = await wait_dlc
|
||||||
|
|
||||||
# Setup HFP connnection
|
# Setup HFP connection
|
||||||
hf = hfp.HfProtocol(client_dlc, hf_config)
|
hf = hfp.HfProtocol(client_dlc, hf_config)
|
||||||
ag = hfp.HfpProtocol(server_dlc)
|
ag = hfp.HfpProtocol(server_dlc)
|
||||||
return hf, ag
|
return hf, ag
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import pathlib
|
||||||
|
import pytest
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
|
|
||||||
@@ -83,87 +85,95 @@ JSON3 = """
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def test_basic():
|
@pytest.fixture
|
||||||
with tempfile.NamedTemporaryFile(mode="r+", encoding='utf-8') as file:
|
def temporary_file():
|
||||||
keystore = JsonKeyStore('my_namespace', file.name)
|
file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
file.close()
|
||||||
|
yield file.name
|
||||||
|
pathlib.Path(file.name).unlink()
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
async def test_basic(temporary_file):
|
||||||
|
with open(temporary_file, mode='w', encoding='utf-8') as file:
|
||||||
file.write("{}")
|
file.write("{}")
|
||||||
file.flush()
|
file.flush()
|
||||||
|
|
||||||
keys = await keystore.get_all()
|
keystore = JsonKeyStore('my_namespace', temporary_file)
|
||||||
assert len(keys) == 0
|
|
||||||
|
|
||||||
keys = PairingKeys()
|
keys = await keystore.get_all()
|
||||||
await keystore.update('foo', keys)
|
assert len(keys) == 0
|
||||||
foo = await keystore.get('foo')
|
|
||||||
assert foo is not None
|
|
||||||
assert foo.ltk is None
|
|
||||||
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
|
|
||||||
keys.ltk = PairingKeys.Key(ltk)
|
|
||||||
await keystore.update('foo', keys)
|
|
||||||
foo = await keystore.get('foo')
|
|
||||||
assert foo is not None
|
|
||||||
assert foo.ltk is not None
|
|
||||||
assert foo.ltk.value == ltk
|
|
||||||
|
|
||||||
file.flush()
|
keys = PairingKeys()
|
||||||
with open(file.name, "r", encoding="utf-8") as json_file:
|
await keystore.update('foo', keys)
|
||||||
json_data = json.load(json_file)
|
foo = await keystore.get('foo')
|
||||||
assert 'my_namespace' in json_data
|
assert foo is not None
|
||||||
assert 'foo' in json_data['my_namespace']
|
assert foo.ltk is None
|
||||||
assert 'ltk' in json_data['my_namespace']['foo']
|
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
|
||||||
|
keys.ltk = PairingKeys.Key(ltk)
|
||||||
|
await keystore.update('foo', keys)
|
||||||
|
foo = await keystore.get('foo')
|
||||||
|
assert foo is not None
|
||||||
|
assert foo.ltk is not None
|
||||||
|
assert foo.ltk.value == ltk
|
||||||
|
|
||||||
|
with open(file.name, "r", encoding="utf-8") as json_file:
|
||||||
|
json_data = json.load(json_file)
|
||||||
|
assert 'my_namespace' in json_data
|
||||||
|
assert 'foo' in json_data['my_namespace']
|
||||||
|
assert 'ltk' in json_data['my_namespace']['foo']
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def test_parsing():
|
async def test_parsing(temporary_file):
|
||||||
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file:
|
with open(temporary_file, mode='w', encoding='utf-8') as file:
|
||||||
keystore = JsonKeyStore('my_namespace', file.name)
|
|
||||||
file.write(JSON1)
|
file.write(JSON1)
|
||||||
file.flush()
|
file.flush()
|
||||||
|
|
||||||
foo = await keystore.get('14:7D:DA:4E:53:A8/P')
|
keystore = JsonKeyStore('my_namespace', file.name)
|
||||||
assert foo is not None
|
foo = await keystore.get('14:7D:DA:4E:53:A8/P')
|
||||||
assert foo.ltk.value == bytes.fromhex('d1897ee10016eb1a08e4e037fd54c683')
|
assert foo is not None
|
||||||
|
assert foo.ltk.value == bytes.fromhex('d1897ee10016eb1a08e4e037fd54c683')
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def test_default_namespace():
|
async def test_default_namespace(temporary_file):
|
||||||
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file:
|
with open(temporary_file, mode='w', encoding='utf-8') as file:
|
||||||
keystore = JsonKeyStore(None, file.name)
|
|
||||||
file.write(JSON1)
|
file.write(JSON1)
|
||||||
file.flush()
|
file.flush()
|
||||||
|
|
||||||
all_keys = await keystore.get_all()
|
keystore = JsonKeyStore(None, file.name)
|
||||||
assert len(all_keys) == 1
|
all_keys = await keystore.get_all()
|
||||||
name, keys = all_keys[0]
|
assert len(all_keys) == 1
|
||||||
assert name == '14:7D:DA:4E:53:A8/P'
|
name, keys = all_keys[0]
|
||||||
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
|
assert name == '14:7D:DA:4E:53:A8/P'
|
||||||
|
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file:
|
with open(temporary_file, mode='w', encoding='utf-8') as file:
|
||||||
keystore = JsonKeyStore(None, file.name)
|
|
||||||
file.write(JSON2)
|
file.write(JSON2)
|
||||||
file.flush()
|
file.flush()
|
||||||
|
|
||||||
keys = PairingKeys()
|
keystore = JsonKeyStore(None, file.name)
|
||||||
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
|
keys = PairingKeys()
|
||||||
keys.ltk = PairingKeys.Key(ltk)
|
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
|
||||||
await keystore.update('foo', keys)
|
keys.ltk = PairingKeys.Key(ltk)
|
||||||
file.flush()
|
await keystore.update('foo', keys)
|
||||||
with open(file.name, "r", encoding="utf-8") as json_file:
|
with open(file.name, "r", encoding="utf-8") as json_file:
|
||||||
json_data = json.load(json_file)
|
json_data = json.load(json_file)
|
||||||
assert '__DEFAULT__' in json_data
|
assert '__DEFAULT__' in json_data
|
||||||
assert 'foo' in json_data['__DEFAULT__']
|
assert 'foo' in json_data['__DEFAULT__']
|
||||||
assert 'ltk' in json_data['__DEFAULT__']['foo']
|
assert 'ltk' in json_data['__DEFAULT__']['foo']
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file:
|
with open(temporary_file, mode='w', encoding='utf-8') as file:
|
||||||
keystore = JsonKeyStore(None, file.name)
|
|
||||||
file.write(JSON3)
|
file.write(JSON3)
|
||||||
file.flush()
|
file.flush()
|
||||||
|
|
||||||
all_keys = await keystore.get_all()
|
keystore = JsonKeyStore(None, file.name)
|
||||||
assert len(all_keys) == 1
|
all_keys = await keystore.get_all()
|
||||||
name, keys = all_keys[0]
|
assert len(all_keys) == 1
|
||||||
assert name == '14:7D:DA:4E:53:A8/P'
|
name, keys = all_keys[0]
|
||||||
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
|
assert name == '14:7D:DA:4E:53:A8/P'
|
||||||
|
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user