Compare commits

...

28 Commits

Author SHA1 Message Date
Lucas Abel
4c6320f98a Merge pull request #142 from AlanRosenthal/main
Fix small bug with services set via --device-config
2023-03-23 12:47:14 -07:00
Lucas Abel
cc0d56ad14 Merge pull request #152 from duohoo/g722_decoder
Add G722 decoder with pure python implementation
2023-03-23 12:45:07 -07:00
Lucas Abel
0019fa8e79 Merge pull request #149 from yuyangh/yuyangh/add_ASHA_event_emit
Add ASHA event emitter
2023-03-23 12:44:42 -07:00
Lucas Abel
7ae1bf8959 Merge pull request #148 from yuyangh/yuyangh/add_audio_status_point
Add ASHA audio status point
2023-03-23 12:43:35 -07:00
Yuyang Huang
9541cb6db0 Add ASHA audio status point 2023-03-23 12:15:10 -07:00
Lucas Abel
1cd13dfc19 Merge pull request #153 from benquike/main
Add 1 bug fix and a few features in bumble
2023-03-23 10:31:02 -07:00
Hui Peng
d4346c3c9b delegate the HCI_PIN_Code_Request event on host 2023-03-23 10:14:56 -07:00
Hui Peng
afe8765508 Add on_pin_code_request to support legacy BT classic pairing 2023-03-23 10:14:56 -07:00
Hui Peng
41d1772cb5 Add test for HCI_PIN_Code_Request_Reply_Command 2023-03-23 10:14:51 -07:00
Hui Peng
6e9078d60e Add implemenetation of HCI_PIN_Code_Request_Reply_Command 2023-03-23 09:50:50 -07:00
Hui Peng
d5c7d0db57 Fix a bug in HCI_Object.dict_from_bytes 2023-03-23 08:57:10 -07:00
Hui Peng
b70ebdef73 Allow Device.enable_classic to be configurable 2023-03-23 08:56:32 -07:00
Duo Ho
3af027e234 fix comments 2023-03-23 04:36:02 +00:00
Gilles Boccon-Gibod
6e719ca9fd Merge pull request #147 from google/gbg/btbench
add benchmark tool and doc
2023-03-22 21:13:24 -07:00
Duo Ho
1a580d1c1e Add G722 decoder with pure python implementation 2023-03-23 03:07:45 +00:00
Alan Rosenthal
aee7348687 Merge pull request #151 from AlanRosenthal/alan/add-types-via-pytypes
Used pytype to find some missing types / fix small issue
2023-03-22 20:58:25 -04:00
Gilles Boccon-Gibod
864889ccab rename .run to .spawn 2023-03-22 17:26:32 -07:00
Alan Rosenthal
fda00dcb28 Used pytype to find some missing types
```
pytype --pythonpath . ./bumble/device.py
```
2023-03-22 14:46:41 +00:00
Yuyang Huang
77e5618ce7 Add ASHA event emitter 2023-03-21 18:00:50 -07:00
Yuyang Huang
6fa857ad13 Add ASHA event emitter 2023-03-21 15:38:29 -07:00
Gilles Boccon-Gibod
bc29f327ef address PR comments, take 2. 2023-03-21 15:33:34 -07:00
Gilles Boccon-Gibod
1894b96de4 address PR comments 2023-03-21 15:01:46 -07:00
Gilles Boccon-Gibod
c4fb63d35c Merge pull request #146 from google/gbg/snoop-file
add auto-snooping for transports
2023-03-21 09:15:07 -07:00
Gilles Boccon-Gibod
33ae047765 add reversed role example doc 2023-03-20 18:35:22 -07:00
Gilles Boccon-Gibod
1efa2e9d44 add benchmark tool and doc 2023-03-20 18:25:21 -07:00
Gilles Boccon-Gibod
aa9af61cbe improve exception messages 2023-03-20 12:14:28 -07:00
Gilles Boccon-Gibod
dc3ac3060e add auto-snooping for transports 2023-03-20 11:06:50 -07:00
Alan Rosenthal
c34c5fdf17 Fix small bug with services set via --device-config
before:
```
  File "/home/alanrosenthal/code/fitbit/bumble/bumble/gatt.py", line 572, in __str__
    f'Descriptor(handle=0x{self.handle:04X}, '
  File "/home/alanrosenthal/code/fitbit/bumble/bumble/att.py", line 756, in read_value
    self.permissions & self.READ_REQUIRES_ENCRYPTION
TypeError: unsupported operand type(s) for &: 'str' and 'int'
```
2023-03-14 18:16:46 -04:00
25 changed files with 2312 additions and 67 deletions

1207
apps/bench.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -30,6 +30,8 @@ from bumble.hci import (
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
@@ -45,11 +47,20 @@ from bumble.host import Host
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
def command_succeeded(response):
if isinstance(response, HCI_Command_Status_Event):
return response.status == HCI_SUCCESS
if isinstance(response, HCI_Command_Complete_Event):
return response.return_parameters.status == HCI_SUCCESS
return False
# -----------------------------------------------------------------------------
async def get_classic_info(host):
if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
response = await host.send_command(HCI_Read_BD_ADDR_Command())
if response.return_parameters.status == HCI_SUCCESS:
if command_succeeded(response):
print()
print(
color('Classic Address:', 'yellow'), response.return_parameters.bd_addr
@@ -57,7 +68,7 @@ async def get_classic_info(host):
if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
response = await host.send_command(HCI_Read_Local_Name_Command())
if response.return_parameters.status == HCI_SUCCESS:
if command_succeeded(response):
print()
print(
color('Local Name:', 'yellow'),
@@ -73,7 +84,7 @@ async def get_le_info(host):
response = await host.send_command(
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
if response.return_parameters.status == HCI_SUCCESS:
if command_succeeded(response):
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
@@ -84,7 +95,7 @@ async def get_le_info(host):
response = await host.send_command(
HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
if response.return_parameters.status == HCI_SUCCESS:
if command_succeeded(response):
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
@@ -93,7 +104,7 @@ async def get_le_info(host):
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
if command_succeeded(response):
print(
color('Maximum Data Length:', 'yellow'),
(

View File

@@ -23,11 +23,12 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import functools
import struct
from pyee import EventEmitter
from typing import Dict, Type, TYPE_CHECKING
from bumble.core import UUID, name_or_number
from bumble.core import UUID, name_or_number, get_dict_key_by_value
from bumble.hci import HCI_Object, key_with_value
from bumble.colors import color
@@ -725,11 +726,33 @@ class Attribute(EventEmitter):
READ_REQUIRES_AUTHORIZATION = 0x40
WRITE_REQUIRES_AUTHORIZATION = 0x80
PERMISSION_NAMES = {
READABLE: 'READABLE',
WRITEABLE: 'WRITEABLE',
READ_REQUIRES_ENCRYPTION: 'READ_REQUIRES_ENCRYPTION',
WRITE_REQUIRES_ENCRYPTION: 'WRITE_REQUIRES_ENCRYPTION',
READ_REQUIRES_AUTHENTICATION: 'READ_REQUIRES_AUTHENTICATION',
WRITE_REQUIRES_AUTHENTICATION: 'WRITE_REQUIRES_AUTHENTICATION',
READ_REQUIRES_AUTHORIZATION: 'READ_REQUIRES_AUTHORIZATION',
WRITE_REQUIRES_AUTHORIZATION: 'WRITE_REQUIRES_AUTHORIZATION',
}
@staticmethod
def string_to_permissions(permissions_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Attribute.PERMISSION_NAMES, y),
permissions_str.split(","),
0,
)
def __init__(self, attribute_type, permissions, value=b''):
EventEmitter.__init__(self)
self.handle = 0
self.end_group_handle = 0
self.permissions = permissions
if isinstance(permissions, str):
self.permissions = self.string_to_permissions(permissions)
else:
self.permissions = permissions
# Convert the type to a UUID object if it isn't already
if isinstance(attribute_type, str):

416
bumble/decoder.py Normal file
View File

@@ -0,0 +1,416 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# fmt: off
WL = [-60, -30, 58, 172, 334, 538, 1198, 3042]
RL42 = [0, 7, 6, 5, 4, 3, 2, 1, 7, 6, 5, 4, 3, 2, 1, 0]
ILB = [
2048,
2093,
2139,
2186,
2233,
2282,
2332,
2383,
2435,
2489,
2543,
2599,
2656,
2714,
2774,
2834,
2896,
2960,
3025,
3091,
3158,
3228,
3298,
3371,
3444,
3520,
3597,
3676,
3756,
3838,
3922,
4008,
]
WH = [0, -214, 798]
RH2 = [2, 1, 2, 1]
# Values in QM2/QM4/QM6 left shift three bits than original g722 specification.
QM2 = [-7408, -1616, 7408, 1616]
QM4 = [
0,
-20456,
-12896,
-8968,
-6288,
-4240,
-2584,
-1200,
20456,
12896,
8968,
6288,
4240,
2584,
1200,
0,
]
QM6 = [
-136,
-136,
-136,
-136,
-24808,
-21904,
-19008,
-16704,
-14984,
-13512,
-12280,
-11192,
-10232,
-9360,
-8576,
-7856,
-7192,
-6576,
-6000,
-5456,
-4944,
-4464,
-4008,
-3576,
-3168,
-2776,
-2400,
-2032,
-1688,
-1360,
-1040,
-728,
24808,
21904,
19008,
16704,
14984,
13512,
12280,
11192,
10232,
9360,
8576,
7856,
7192,
6576,
6000,
5456,
4944,
4464,
4008,
3576,
3168,
2776,
2400,
2032,
1688,
1360,
1040,
728,
432,
136,
-432,
-136,
]
QMF_COEFFS = [3, -11, 12, 32, -210, 951, 3876, -805, 362, -156, 53, -11]
# fmt: on
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class G722Decoder(object):
"""G.722 decoder with bitrate 64kbit/s.
For the Blocks in the sub-band decoders, please refer to the G.722
specification for the required information. G722 specification:
https://www.itu.int/rec/T-REC-G.722-201209-I
"""
def __init__(self):
self._x = [0] * 24
self._band = [Band(), Band()]
# The initial value in BLOCK 3L
self._band[0].det = 32
# The initial value in BLOCK 3H
self._band[1].det = 8
def decode_frame(self, encoded_data) -> bytearray:
result_array = bytearray(len(encoded_data) * 4)
self.g722_decode(result_array, encoded_data)
return result_array
def g722_decode(self, result_array, encoded_data) -> int:
"""Decode the data frame using g722 decoder."""
result_length = 0
for code in encoded_data:
higher_bits = (code >> 6) & 0x03
lower_bits = code & 0x3F
rlow = self.lower_sub_band_decoder(lower_bits)
rhigh = self.higher_sub_band_decoder(higher_bits)
# Apply the receive QMF
self._x[:22] = self._x[2:]
self._x[22] = rlow + rhigh
self._x[23] = rlow - rhigh
xout2 = sum(self._x[2 * i] * QMF_COEFFS[i] for i in range(12))
xout1 = sum(self._x[2 * i + 1] * QMF_COEFFS[11 - i] for i in range(12))
result_length = self.update_decoded_result(
xout1, result_length, result_array
)
result_length = self.update_decoded_result(
xout2, result_length, result_array
)
return result_length
def update_decoded_result(self, xout, byte_length, byte_array) -> int:
result = (int)(xout >> 11)
bytes_result = result.to_bytes(2, 'little', signed=True)
byte_array[byte_length] = bytes_result[0]
byte_array[byte_length + 1] = bytes_result[1]
return byte_length + 2
def lower_sub_band_decoder(self, lower_bits) -> int:
"""Lower sub-band decoder for last six bits."""
# Block 5L
# INVQBL
wd1 = lower_bits
wd2 = QM6[wd1]
wd1 >>= 2
wd2 = (self._band[0].det * wd2) >> 15
# RECONS
rlow = self._band[0].s + wd2
# Block 6L
# LIMIT
if rlow > 16383:
rlow = 16383
elif rlow < -16384:
rlow = -16384
# Block 2L
# INVQAL
wd2 = QM4[wd1]
dlowt = (self._band[0].det * wd2) >> 15
# Block 3L
# LOGSCL
wd2 = RL42[wd1]
wd1 = (self._band[0].nb * 127) >> 7
wd1 += WL[wd2]
if wd1 < 0:
wd1 = 0
elif wd1 > 18432:
wd1 = 18432
self._band[0].nb = wd1
# SCALEL
wd1 = (self._band[0].nb >> 6) & 31
wd2 = 8 - (self._band[0].nb >> 11)
if wd2 < 0:
wd3 = ILB[wd1] << -wd2
else:
wd3 = ILB[wd1] >> wd2
self._band[0].det = wd3 << 2
# Block 4L
self._band[0].block4(dlowt)
return rlow
def higher_sub_band_decoder(self, higher_bits) -> int:
"""Higher sub-band decoder for first two bits."""
# Block 2H
# INVQAH
wd2 = QM2[higher_bits]
dhigh = (self._band[1].det * wd2) >> 15
# Block 5H
# RECONS
rhigh = dhigh + self._band[1].s
# Block 6H
# LIMIT
if rhigh > 16383:
rhigh = 16383
elif rhigh < -16384:
rhigh = -16384
# Block 3H
# LOGSCH
wd2 = RH2[higher_bits]
wd1 = (self._band[1].nb * 127) >> 7
wd1 += WH[wd2]
if wd1 < 0:
wd1 = 0
elif wd1 > 22528:
wd1 = 22528
self._band[1].nb = wd1
# SCALEH
wd1 = (self._band[1].nb >> 6) & 31
wd2 = 10 - (self._band[1].nb >> 11)
if wd2 < 0:
wd3 = ILB[wd1] << -wd2
else:
wd3 = ILB[wd1] >> wd2
self._band[1].det = wd3 << 2
# Block 4H
self._band[1].block4(dhigh)
return rhigh
# -----------------------------------------------------------------------------
class Band(object):
"""Structure for G722 decode proccessing."""
s: int = 0
nb: int = 0
det: int = 0
def __init__(self):
self._sp = 0
self._sz = 0
self._r = [0] * 3
self._a = [0] * 3
self._ap = [0] * 3
self._p = [0] * 3
self._d = [0] * 7
self._b = [0] * 7
self._bp = [0] * 7
self._sg = [0] * 7
def saturate(self, amp: int) -> int:
if amp > 32767:
return 32767
elif amp < -32768:
return -32768
else:
return amp
def block4(self, d: int) -> None:
"""Block4 for both lower and higher sub-band decoder."""
wd1 = 0
wd2 = 0
wd3 = 0
# RECONS
self._d[0] = d
self._r[0] = self.saturate(self.s + d)
# PARREC
self._p[0] = self.saturate(self._sz + d)
# UPPOL2
for i in range(3):
self._sg[i] = (self._p[i]) >> 15
wd1 = self.saturate((self._a[1]) << 2)
wd2 = -wd1 if self._sg[0] == self._sg[1] else wd1
if wd2 > 32767:
wd2 = 32767
wd3 = 128 if self._sg[0] == self._sg[2] else -128
wd3 += wd2 >> 7
wd3 += (self._a[2] * 32512) >> 15
if wd3 > 12288:
wd3 = 12288
elif wd3 < -12288:
wd3 = -12288
self._ap[2] = wd3
# UPPOL1
self._sg[0] = (self._p[0]) >> 15
self._sg[1] = (self._p[1]) >> 15
wd1 = 192 if self._sg[0] == self._sg[1] else -192
wd2 = (self._a[1] * 32640) >> 15
self._ap[1] = self.saturate(wd1 + wd2)
wd3 = self.saturate(15360 - self._ap[2])
if self._ap[1] > wd3:
self._ap[1] = wd3
elif self._ap[1] < -wd3:
self._ap[1] = -wd3
# UPZERO
wd1 = 0 if d == 0 else 128
self._sg[0] = d >> 15
for i in range(1, 7):
self._sg[i] = (self._d[i]) >> 15
wd2 = wd1 if self._sg[i] == self._sg[0] else -wd1
wd3 = (self._b[i] * 32640) >> 15
self._bp[i] = self.saturate(wd2 + wd3)
# DELAYA
for i in range(6, 0, -1):
self._d[i] = self._d[i - 1]
self._b[i] = self._bp[i]
for i in range(2, 0, -1):
self._r[i] = self._r[i - 1]
self._p[i] = self._p[i - 1]
self._a[i] = self._ap[i]
# FILTEP
self._sp = 0
for i in range(1, 3):
wd1 = self.saturate(self._r[i] + self._r[i])
self._sp += (self._a[i] * wd1) >> 15
self._sp = self.saturate(self._sp)
# FILTEZ
self._sz = 0
for i in range(6, 0, -1):
wd1 = self.saturate(self._d[i] + self._d[i])
self._sz += (self._b[i] * wd1) >> 15
self._sz = self.saturate(self._sz)
# PREDIC
self.s = self.saturate(self._sp + self._sz)

View File

@@ -50,6 +50,7 @@ from .hci import (
HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
HCI_LE_RAND_COMMAND,
HCI_LE_READ_PHY_COMMAND,
HCI_LE_SET_PHY_COMMAND,
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
@@ -94,6 +95,8 @@ from .hci import (
HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Set_Scan_Response_Data_Command,
HCI_PIN_Code_Request_Reply_Command,
HCI_PIN_Code_Request_Negative_Reply_Command,
HCI_Read_BD_ADDR_Command,
HCI_Read_RSSI_Command,
HCI_Reject_Connection_Request_Command,
@@ -310,6 +313,9 @@ class AdvertisementDataAccumulator:
def update(self, report):
advertisement = Advertisement.from_advertising_report(report)
if advertisement is None:
return None
result = None
if advertisement.is_scan_response:
@@ -739,6 +745,7 @@ class DeviceConfiguration:
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.classic_enabled = False
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.classic_accept_any = True
@@ -768,6 +775,7 @@ class DeviceConfiguration:
self.le_simultaneous_enabled = config.get(
'le_simultaneous_enabled', self.le_simultaneous_enabled
)
self.classic_enabled = config.get('classic_enabled', self.classic_enabled)
self.classic_sc_enabled = config.get(
'classic_sc_enabled', self.classic_sc_enabled
)
@@ -979,6 +987,7 @@ class Device(CompositeEventEmitter):
self.keystore = KeyStore.create_for_device(config)
self.irk = config.irk
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
@@ -999,7 +1008,7 @@ class Device(CompositeEventEmitter):
new_characteristic = Characteristic(
uuid=characteristic["uuid"],
properties=characteristic["properties"],
permissions=int(characteristic["permissions"], 0),
permissions=characteristic["permissions"],
descriptors=descriptors,
)
characteristics.append(new_characteristic)
@@ -1242,6 +1251,11 @@ class Device(CompositeEventEmitter):
# Done
self.powered_on = True
async def power_off(self) -> None:
if self.powered_on:
await self.host.flush()
self.powered_on = False
def supports_le_feature(self, feature):
return self.host.supports_le_feature(feature)
@@ -1666,7 +1680,7 @@ class Device(CompositeEventEmitter):
)
)
if not phys:
raise ValueError('least one supported PHY needed')
raise ValueError('at least one supported PHY needed')
phy_count = len(phys)
initiating_phys = phy_list_to_bits(phys)
@@ -1807,7 +1821,7 @@ class Device(CompositeEventEmitter):
try:
return await self.abort_on('flush', pending_connection)
except ConnectionError as error:
except core.ConnectionError as error:
raise core.TimeoutError() from error
finally:
self.remove_listener('connection', on_connection)
@@ -2041,21 +2055,31 @@ class Device(CompositeEventEmitter):
async def set_connection_phy(
self, connection, tx_phys=None, rx_phys=None, phy_options=None
):
if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND):
logger.warning('ignoring request, command not supported')
return
all_phys_bits = (1 if tx_phys is None else 0) | (
(1 if rx_phys is None else 0) << 1
)
return await self.send_command(
result = await self.send_command(
HCI_LE_Set_PHY_Command(
connection_handle=connection.handle,
all_phys=all_phys_bits,
tx_phys=phy_list_to_bits(tx_phys),
rx_phys=phy_list_to_bits(rx_phys),
phy_options=0 if phy_options is None else int(phy_options),
),
check_result=True,
)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Set_PHY_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
async def set_default_phy(self, tx_phys=None, rx_phys=None):
all_phys_bits = (1 if tx_phys is None else 0) | (
(1 if rx_phys is None else 0) << 1
@@ -2494,7 +2518,7 @@ class Device(CompositeEventEmitter):
self.advertising = False
# Notify listeners
error = ConnectionError(
error = core.ConnectionError(
error_code,
transport,
peer_address,
@@ -2567,7 +2591,7 @@ class Device(CompositeEventEmitter):
@with_connection_from_handle
def on_disconnection_failure(self, connection, error_code):
logger.debug(f'*** Disconnection failed: {error_code}')
error = ConnectionError(
error = core.ConnectionError(
error_code,
connection.transport,
connection.peer_address,
@@ -2762,6 +2786,51 @@ class Device(CompositeEventEmitter):
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_pin_code_request(self, connection):
# classic legacy pairing
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
# respond the pin code
if can_input:
async def get_pin_code():
pin_code = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
)
if pin_code is not None:
pin_code = bytes(str(pin_code).zfill(6))
await self.host.send_command(
HCI_PIN_Code_Request_Reply_Command(
bd_addr=connection.peer_address,
pin_code_length=len(pin_code),
pin_code=pin_code,
)
)
else:
await self.host.send_command(
HCI_PIN_Code_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
asyncio.create_task(get_pin_code())
else:
self.host.send_command_sync(
HCI_PIN_Code_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
@@ -2776,7 +2845,7 @@ class Device(CompositeEventEmitter):
# [Classic only]
@host_event_handler
@try_with_connection_from_address
def on_remote_name(self, connection, address, remote_name):
def on_remote_name(self, connection: Connection, address, remote_name):
# Try to decode the name
try:
remote_name = remote_name.decode('utf-8')
@@ -2794,7 +2863,7 @@ class Device(CompositeEventEmitter):
# [Classic only]
@host_event_handler
@try_with_connection_from_address
def on_remote_name_failure(self, connection, address, error):
def on_remote_name_failure(self, connection: Connection, address, error):
if connection:
connection.emit('remote_name_failure', error)
self.emit('remote_name_failure', address, error)

View File

@@ -691,7 +691,7 @@ class Server(EventEmitter):
length=entry_size, attribute_data_list=b''.join(attribute_data_list)
)
else:
logging.warning(f"not found {request}")
logging.debug(f"not found {request}")
self.send_response(connection, response)

View File

@@ -1491,7 +1491,7 @@ class HCI_Object:
elif field_type == -2:
# 16-bit signed
field_value = struct.unpack_from('<h', data, offset)[0]
offset += 1
offset += 2
elif field_type == 3:
# 24-bit unsigned
padded = data[offset : offset + 3] + bytes([0])
@@ -2097,6 +2097,24 @@ class HCI_Link_Key_Request_Negative_Reply_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('pin_code_length', 1),
('pin_code', '*'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_PIN_Code_Request_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.12 PIN Code Request Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('bd_addr', Address.parse_address)],

View File

@@ -53,7 +53,6 @@ from .hci import (
HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command,
HCI_PIN_Code_Request_Negative_Reply_Command,
HCI_Packet,
HCI_Read_Buffer_Size_Command,
HCI_Read_Local_Supported_Commands_Command,
@@ -276,7 +275,7 @@ class Host(AbortableEventEmitter):
def send_hci_packet(self, packet):
if self.snooper:
self.snooper.snoop(packet, Snooper.Direction.HOST_TO_CONTROLLER)
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
self.hci_sink.on_packet(packet.to_bytes())
@@ -425,7 +424,7 @@ class Host(AbortableEventEmitter):
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')
if self.snooper:
self.snooper.snoop(packet, Snooper.Direction.CONTROLLER_TO_HOST)
self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
# If the packet is a command, invoke the handler for this packet
if packet.hci_packet_type == HCI_COMMAND_PACKET:
@@ -794,11 +793,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_pin_code_request_event(self, event):
# For now, just refuse all requests
# TODO: delegate the decision
self.send_command_sync(
HCI_PIN_Code_Request_Negative_Reply_Command(bd_addr=event.bd_addr)
)
self.emit('pin_code_request', event.bd_addr)
def on_hci_link_key_request_event(self, event):
async def send_link_key():

View File

@@ -796,6 +796,11 @@ class Channel(EventEmitter):
self.disconnection_result = asyncio.get_running_loop().create_future()
return await self.disconnection_result
def abort(self):
if self.state == self.OPEN:
self.change_state(self.CLOSED)
self.emit('close')
def send_configure_request(self):
options = L2CAP_Control_Frame.encode_configuration_options(
[
@@ -1105,6 +1110,10 @@ class LeConnectionOrientedChannel(EventEmitter):
self.disconnection_result = asyncio.get_running_loop().create_future()
return await self.disconnection_result
def abort(self):
if self.state == self.CONNECTED:
self.change_state(self.DISCONNECTED)
def on_pdu(self, pdu):
if self.sink is None:
logger.warning('received pdu without a sink')
@@ -1492,8 +1501,12 @@ class ChannelManager:
def on_disconnection(self, connection_handle, _reason):
logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
if connection_handle in self.channels:
for _, channel in self.channels[connection_handle].items():
channel.abort()
del self.channels[connection_handle]
if connection_handle in self.le_coc_channels:
for _, channel in self.le_coc_channels[connection_handle].items():
channel.abort()
del self.le_coc_channels[connection_handle]
if connection_handle in self.identifiers:
del self.identifiers[connection_handle]

View File

@@ -20,6 +20,7 @@ import struct
import logging
from typing import List
from ..core import AdvertisingData
from ..device import Device
from ..gatt import (
GATT_ASHA_SERVICE,
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
@@ -31,7 +32,7 @@ from ..gatt import (
Characteristic,
CharacteristicValue,
)
from ..device import Device
from ..utils import AsyncRunner
# -----------------------------------------------------------------------------
# Logging
@@ -55,13 +56,13 @@ class AshaService(TemplateService):
self.hisyncid = hisyncid
self.capability = capability # Device Capabilities [Left, Monaural]
self.device = device
self.emitted_data_name = 'ASHA_data_' + str(self.capability)
self.audio_out_data = b''
self.psm = psm # a non-zero psm is mainly for testing purpose
# Handler for volume control
def on_volume_write(_connection, value):
logger.info(f'--- VOLUME Write:{value[0]}')
self.emit('volume', value[0])
# Handler for audio control commands
def on_audio_control_point_write(_connection, value):
@@ -76,14 +77,28 @@ class AshaService(TemplateService):
f'volume={value[3]}, '
f'otherstate={value[4]}'
)
self.emit(
'start',
{
'codec': value[1],
'audiotype': value[2],
'volume': value[3],
'otherstate': value[4],
},
)
elif opcode == AshaService.OPCODE_STOP:
logger.info('### STOP')
self.emit('stop')
elif opcode == AshaService.OPCODE_STATUS:
logger.info(f'### STATUS: connected={value[1]}')
# TODO Respond with a status
# asyncio.create_task(device.notify_subscribers(audio_status_characteristic,
# force=True))
# OPCODE_STATUS does not need audio status point update
if opcode != AshaService.OPCODE_STATUS:
AsyncRunner.spawn(
device.notify_subscribers(
self.audio_status_characteristic, force=True
)
)
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
@@ -126,7 +141,7 @@ class AshaService(TemplateService):
def on_data(data):
logging.debug(f'<<< data received:{data}')
self.emit(self.emitted_data_name, data)
self.emit('data', data)
self.audio_out_data += data
channel.sink = on_data

View File

@@ -852,17 +852,27 @@ class Server(EventEmitter):
# Register ourselves with the L2CAP channel manager
device.register_l2cap_server(RFCOMM_PSM, self.on_connection)
def listen(self, acceptor):
# Find a free channel number
for channel in range(
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START, RFCOMM_DYNAMIC_CHANNEL_NUMBER_END + 1
):
if channel not in self.acceptors:
self.acceptors[channel] = acceptor
return channel
def listen(self, acceptor, channel=0):
if channel:
if channel in self.acceptors:
# Busy
return 0
else:
# Find a free channel number
for candidate in range(
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START,
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END + 1,
):
if candidate not in self.acceptors:
channel = candidate
break
# All channels used...
return 0
if channel == 0:
# All channels used...
return 0
self.acceptors[channel] = acceptor
return channel
def on_connection(self, l2cap_channel):
logger.debug(f'+++ new L2CAP connection: {l2cap_channel}')

View File

@@ -15,12 +15,21 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from contextlib import contextmanager
from enum import IntEnum
import logging
import struct
import datetime
from typing import BinaryIO
from typing import BinaryIO, Generator
import os
from bumble.hci import HCI_Packet, HCI_COMMAND_PACKET, HCI_EVENT_PACKET
from bumble.hci import HCI_COMMAND_PACKET, HCI_EVENT_PACKET
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@@ -44,7 +53,7 @@ class Snooper:
HCI_BSCP = 1003
H5 = 1004
def snoop(self, hci_packet: HCI_Packet, direction: Direction) -> None:
def snoop(self, hci_packet: bytes, direction: Direction) -> None:
"""Snoop on an HCI packet."""
@@ -67,9 +76,10 @@ class BtSnooper(Snooper):
self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4)
)
def snoop(self, hci_packet: HCI_Packet, direction: Snooper.Direction) -> None:
def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None:
flags = int(direction)
if hci_packet.hci_packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET):
packet_type = hci_packet[0]
if packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET):
flags |= 0x10
# Compute the current timestamp
@@ -79,15 +89,82 @@ class BtSnooper(Snooper):
)
# Emit the record
packet_data = bytes(hci_packet)
self.output.write(
struct.pack(
'>IIIIQ',
len(packet_data), # Original Length
len(packet_data), # Included Length
len(hci_packet), # Original Length
len(hci_packet), # Included Length
flags, # Packet Flags
0, # Cumulative Drops
timestamp, # Timestamp
)
+ packet_data
+ hci_packet
)
# -----------------------------------------------------------------------------
_SNOOPER_INSTANCE_COUNT = 0
@contextmanager
def create_snooper(spec: str) -> Generator[Snooper, None, None]:
"""
Create a snooper given a specification string.
The general syntax for the specification string is:
<snooper-type>:<type-specific-arguments>
Supported snooper types are:
btsnoop
The syntax for the type-specific arguments for this type is:
<io-type>:<io-type-specific-arguments>
Supported I/O types are:
file
The type-specific arguments for this I/O type is a string that is converted
to a file path using the python `str.format()` string formatting. The log
records will be written to that file if it can be opened/created.
The keyword args that may be referenced by the string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.utcnow()`
pid: the current process ID.
instance: the instance ID in the current process.
Examples:
btsnoop:file:my_btsnoop.log
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
"""
if ':' not in spec:
raise ValueError('snooper type prefix missing')
snooper_type, snooper_args = spec.split(':', maxsplit=1)
if snooper_type == 'btsnoop':
if ':' not in snooper_args:
raise ValueError('I/O type for btsnoop snooper type missing')
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type == 'file':
# Process the file name string pattern.
global _SNOOPER_INSTANCE_COUNT
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.utcnow(),
pid=os.getpid(),
instance=_SNOOPER_INSTANCE_COUNT,
)
# Open the file
logger.debug(f'Snoop file: {file_path}')
with open(file_path, 'wb') as snoop_file:
_SNOOPER_INSTANCE_COUNT += 1
yield BtSnooper(snoop_file)
_SNOOPER_INSTANCE_COUNT -= 1
return
raise ValueError(f'I/O type {io_type} not supported')
raise ValueError(f'snooper type {snooper_type} not found')

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,10 +15,13 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from contextlib import asynccontextmanager
import logging
import os
from .common import Transport, AsyncPipeSink
from .common import Transport, AsyncPipeSink, SnoopingTransport
from ..controller import Controller
from ..snoop import create_snooper
# -----------------------------------------------------------------------------
# Logging
@@ -26,14 +29,53 @@ from ..controller import Controller
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def _wrap_transport(transport: Transport) -> Transport:
"""
Automatically wrap a Transport instance when a wrapping class can be inferred
from the environment.
If no wrapping class is applicable, the transport argument is returned as-is.
"""
# If BUMBLE_SNOOPER is set, try to automatically create a snooper.
if snooper_spec := os.getenv('BUMBLE_SNOOPER'):
try:
return SnoopingTransport.create_with(
transport, create_snooper(snooper_spec)
)
except Exception as exc:
logger.warning(f'Exception while creating snooper: {exc}')
return transport
# -----------------------------------------------------------------------------
async def open_transport(name: str) -> Transport:
'''
"""
Open a transport by name.
The name must be <type>:<parameters>
Where <parameters> depend on the type (and may be empty for some types).
The supported types are: serial,udp,tcp,pty,usb
'''
The supported types are:
* serial
* udp
* tcp-client
* tcp-server
* ws-client
* ws-server
* pty
* file
* vhci
* hci-socket
* usb
* pyusb
* android-emulator
"""
return _wrap_transport(await _open_transport(name))
# -----------------------------------------------------------------------------
async def _open_transport(name: str) -> Transport:
# pylint: disable=import-outside-toplevel
# pylint: disable=too-many-return-statements
@@ -107,7 +149,18 @@ async def open_transport(name: str) -> Transport:
# -----------------------------------------------------------------------------
async def open_transport_or_link(name):
async def open_transport_or_link(name: str) -> Transport:
"""
Open a transport or a link relay.
Args:
name:
Name of the transport or link relay to open.
When the name starts with "link-relay:", open a link relay (see RemoteLink
for details on what the arguments are).
For other namespaces, see `open_transport`.
"""
if name.startswith('link-relay:'):
from ..link import RemoteLink # lazy import
@@ -119,6 +172,6 @@ async def open_transport_or_link(name):
async def close(self):
link.close()
return LinkTransport(controller, AsyncPipeSink(controller))
return _wrap_transport(LinkTransport(controller, AsyncPipeSink(controller)))
return await open_transport(name)

View File

@@ -15,12 +15,16 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import contextlib
import struct
import asyncio
import logging
from typing import ContextManager
from .. import hci
from ..colors import color
from ..snoop import Snooper
# -----------------------------------------------------------------------------
@@ -246,6 +250,20 @@ class StreamPacketSink:
# -----------------------------------------------------------------------------
class Transport:
"""
Base class for all transports.
A Transport represents a source and a sink together.
An instance must be closed by calling close() when no longer used. Instances
implement the ContextManager protocol so that they may be used in a `async with`
statement.
An instance is iterable. The iterator yields, in order, its source and sink, so
that it may be used with a convenient call syntax like:
async with create_transport() as (source, sink):
...
"""
def __init__(self, source, sink):
self.source = source
self.sink = sink
@@ -335,3 +353,60 @@ class PumpedTransport(Transport):
async def close(self):
await super().close()
await self.close_function()
# -----------------------------------------------------------------------------
class SnoopingTransport(Transport):
"""Transport wrapper that snoops on packets to/from a wrapped transport."""
@staticmethod
def create_with(
transport: Transport, snooper: ContextManager[Snooper]
) -> SnoopingTransport:
"""
Create an instance given a snooper that works as as context manager.
The returned instance will exit the snooper context when it is closed.
"""
with contextlib.ExitStack() as exit_stack:
return SnoopingTransport(
transport, exit_stack.enter_context(snooper), exit_stack.pop_all().close
)
raise RuntimeError('unexpected code path') # Satisfy the type checker
class Source:
def __init__(self, source, snooper):
self.source = source
self.snooper = snooper
self.sink = None
def set_packet_sink(self, sink):
self.sink = sink
self.source.set_packet_sink(self)
def on_packet(self, packet):
self.snooper.snoop(packet, Snooper.Direction.CONTROLLER_TO_HOST)
if self.sink:
self.sink.on_packet(packet)
class Sink:
def __init__(self, sink, snooper):
self.sink = sink
self.snooper = snooper
def on_packet(self, packet):
self.snooper.snoop(packet, Snooper.Direction.HOST_TO_CONTROLLER)
if self.sink:
self.sink.on_packet(packet)
def __init__(self, transport, snooper, close_snooper=None):
super().__init__(
self.Source(transport.source, snooper), self.Sink(transport.sink, snooper)
)
self.transport = transport
self.close_snooper = close_snooper
async def close(self):
await self.transport.close()
if self.close_snooper:
self.close_snooper()

View File

@@ -20,7 +20,7 @@ import logging
import traceback
import collections
import sys
from typing import Awaitable, TypeVar
from typing import Awaitable, Set, TypeVar
from functools import wraps
from pyee import EventEmitter
@@ -157,6 +157,9 @@ class AsyncRunner:
# Shared default queue
default_queue = WorkQueue()
# Shared set of running tasks
running_tasks: Set[Awaitable] = set()
@staticmethod
def run_in_task(queue=None):
"""
@@ -187,6 +190,19 @@ class AsyncRunner:
return decorator
@staticmethod
def spawn(coroutine):
"""
Spawn a task to run a coroutine in a "fire and forget" mode.
Using this method instead of just calling `asyncio.create_task(coroutine)`
is necessary when you don't keep a reference to the task, because `asyncio`
only keeps weak references to alive tasks.
"""
task = asyncio.create_task(coroutine)
AsyncRunner.running_tasks.add(task)
task.add_done_callback(AsyncRunner.running_tasks.remove)
# -----------------------------------------------------------------------------
class FlowControlAsyncPipe:

View File

@@ -43,7 +43,7 @@ nav:
- Apps & Tools:
- Overview: apps_and_tools/index.md
- Console: apps_and_tools/console.md
- Link Relay: apps_and_tools/link_relay.md
- Bench: apps_and_tools/bench.md
- HCI Bridge: apps_and_tools/hci_bridge.md
- Golden Gate Bridge: apps_and_tools/gg_bridge.md
- Show: apps_and_tools/show.md
@@ -51,6 +51,7 @@ nav:
- Pair: apps_and_tools/pair.md
- Unbond: apps_and_tools/unbond.md
- USB Probe: apps_and_tools/usb_probe.md
- Link Relay: apps_and_tools/link_relay.md
- Hardware:
- Overview: hardware/index.md
- Platforms:
@@ -62,7 +63,7 @@ nav:
- Examples:
- Overview: examples/index.md
copyright: Copyright 2021-2022 Google LLC
copyright: Copyright 2021-2023 Google LLC
theme:
name: 'material'

View File

@@ -0,0 +1,158 @@
BENCH TOOL
==========
The "bench" tool implements a number of different ways of measuring the
throughput and/or latency between two devices.
# General Usage
```
Usage: bench.py [OPTIONS] COMMAND [ARGS]...
Options:
--device-config FILENAME Device configuration file
--role [sender|receiver|ping|pong]
--mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server]
--att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517]
-s, --packet-size SIZE Packet size (server role) [8<=x<=4096]
-c, --packet-count COUNT Packet count (server role)
-sd, --start-delay SECONDS Start delay (server role)
--help Show this message and exit.
Commands:
central Run as a central (initiates the connection)
peripheral Run as a peripheral (waits for a connection)
```
## Options for the ``central`` Command
```
Usage: bumble-bench central [OPTIONS] TRANSPORT
Run as a central (initiates the connection)
Options:
--peripheral ADDRESS_OR_NAME Address or name to connect to
--connection-interval, --ci CONNECTION_INTERVAL
Connection interval (in ms)
--phy [1m|2m|coded] PHY to use
--help Show this message and exit.
```
To test once device against another, one of the two devices must be running
the ``peripheral`` command and the other the ``central`` command. The device
running the ``peripheral`` command will accept connections from the device
running the ``central`` command.
When using Bluetooth LE (all modes except for ``rfcomm-server`` and ``rfcomm-client``utils),
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
using the ``--peripheral`` option. The address will be printed by the Peripheral when
it starts.
Independently of whether the device is the Central or Peripheral, each device selects a
``mode`` and and ``role`` to run as. The ``mode`` and ``role`` of the Central and Peripheral
must be compatible.
Device 1 mode | Device 2 mode
------------------|------------------
``gatt-client`` | ``gatt-server``
``l2cap-client`` | ``l2cap-server``
``rfcomm-client`` | ``rfcomm-server``
Device 1 role | Device 2 role
--------------|--------------
``sender`` | ``receiver``
``ping`` | ``pong``
# Examples
In the following examples, we have two USB Bluetooth controllers, one on `usb:0` and
the other on `usb:1`, and two consoles/terminals. We will run a command in each.
!!! example "GATT Throughput"
Using the default mode and role for the Central and Peripheral.
In the first console/terminal:
```
$ bumble-bench peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench central usb:1
```
In this default configuration, the Central runs a Sender, as a GATT client,
connecting to the Peripheral running a Receiver, as a GATT server.
!!! example "L2CAP Throughput"
In the first console/terminal:
```
$ bumble-bench --mode l2cap-server peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --mode l2cap-client central usb:1
```
!!! example "RFComm Throughput"
In the first console/terminal:
```
$ bumble-bench --mode rfcomm-server peripheral usb:0
```
NOTE: the BT address of the Peripheral will be printed out, use it with the
``--peripheral`` option for the Central.
In this example, we use a larger packet size and packet count than the default.
In the second console/terminal:
```
$ bumble-bench --mode rfcomm-client --packet-size 2000 --packet-count 100 central --peripheral 00:16:A4:5A:40:F2 usb:1
```
!!! example "Ping/Pong Latency"
In the first console/terminal:
```
$ bumble-bench --role pong peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --role ping central usb:1
```
!!! example "Reversed modes with GATT and custom connection interval"
In the first console/terminal:
```
$ bumble-bench --mode gatt-client peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --mode gatt-server central --ci 10 usb:1
```
!!! example "Reversed modes with L2CAP and custom PHY"
In the first console/terminal:
```
$ bumble-bench --mode l2cap-client peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --mode l2cap-server central --phy 2m usb:1
```
!!! example "Reversed roles with L2CAP"
In the first console/terminal:
```
$ bumble-bench --mode l2cap-client --role sender peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --mode l2cap-server --role receiver central usb:1
```

View File

@@ -5,6 +5,7 @@ Included in the project are a few apps and tools, built on top of the core libra
These include:
* [Console](console.md) - an interactive text-based console
* [Bench](bench.md) - Speed and Latency benchmarking between two devices (LE and Classic)
* [Pair](pair.md) - Pair/bond two devices (LE and Classic)
* [Unbond](unbond.md) - Remove a previously established bond
* [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets

View File

@@ -8,8 +8,7 @@ The project initially only supported BLE (Bluetooth Low Energy), but support for
eventually added. Support for BLE is therefore currently somewhat more advanced than for Classic.
!!! warning
This project is still very much experimental and in an alpha state where a lot of things are still missing or broken, and what's there changes frequently.
Also, there are still a few hardcoded values/parameters in some of the examples and apps which need to be changed (those will eventually be command line arguments, as appropriate)
This project is still in an early state of development where some things are still missing or broken, and what's implemented may change and evolve frequently.
Overview
--------

View File

@@ -57,6 +57,7 @@ console_scripts =
bumble-unbond = bumble.apps.unbond:main
bumble-usb-probe = bumble.apps.usb_probe:main
bumble-link-relay = bumble.apps.link_relay.link_relay:main
bumble-bench = bumble.apps.bench:main
[options.package_data]
* = py.typed, *.pyi
@@ -72,7 +73,7 @@ test =
development =
black == 22.10
invoke >= 1.7.3
mypy == 0.991
mypy == 1.1.1
nox >= 2022
pylint == 2.15.8
types-appdirs >= 1.4.3

47
tests/decoder_test.py Normal file
View File

@@ -0,0 +1,47 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import hashlib
import os
from bumble.decoder import G722Decoder
# -----------------------------------------------------------------------------
def test_decode_file():
decoder = G722Decoder()
output_bytes = bytearray()
with open(
os.path.join(os.path.dirname(__file__), 'g722_sample.g722'), 'rb'
) as file:
file_content = file.read()
frame_length = 80
data_length = int(len(file_content) / frame_length)
for i in range(0, data_length):
decoded_data = decoder.decode_frame(
file_content[i * frame_length : i * frame_length + frame_length]
)
output_bytes.extend(decoded_data)
result = hashlib.md5(output_bytes).hexdigest()
assert result == 'b58e0cdd012d12f5633fc796c3b0fbd4'
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_decode_file()

1
tests/g722_sample.g722 Normal file

File diff suppressed because one or more lines are too long

View File

@@ -37,10 +37,12 @@ from bumble.gatt import (
Service,
Characteristic,
CharacteristicValue,
Descriptor,
)
from bumble.transport import AsyncPipeSink
from bumble.core import UUID
from bumble.att import (
Attribute,
ATT_EXCHANGE_MTU_REQUEST,
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
@@ -861,6 +863,29 @@ async def async_main():
await test_mtu_exchange()
# -----------------------------------------------------------------------------
def test_attribute_string_to_permissions():
assert Attribute.string_to_permissions('READABLE') == 1
assert Attribute.string_to_permissions('WRITEABLE') == 2
assert Attribute.string_to_permissions('READABLE,WRITEABLE') == 3
# -----------------------------------------------------------------------------
def test_charracteristic_permissions():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.permissions == 3
# -----------------------------------------------------------------------------
def test_descriptor_permissions():
descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
assert descriptor.permissions == 3
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())

View File

@@ -52,6 +52,7 @@ from bumble.hci import (
HCI_LE_Set_Scan_Parameters_Command,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_PIN_Code_Request_Reply_Command,
HCI_Read_Local_Supported_Commands_Command,
HCI_Read_Local_Supported_Features_Command,
HCI_Read_Local_Version_Information_Command,
@@ -213,6 +214,17 @@ def test_HCI_Command():
# -----------------------------------------------------------------------------
def test_HCI_PIN_Code_Request_Reply_Command():
command = HCI_PIN_Code_Request_Reply_Command(
bd_addr=Address(
'00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS
),
pin_code_length=4,
pin_code=b'1234',
)
basic_check(command)
def test_HCI_Reset_Command():
command = HCI_Reset_Command()
basic_check(command)
@@ -440,6 +452,7 @@ def run_test_events():
def run_test_commands():
test_HCI_Command()
test_HCI_Reset_Command()
test_HCI_PIN_Code_Request_Reply_Command()
test_HCI_Read_Local_Version_Information_Command()
test_HCI_Read_Local_Supported_Commands_Command()
test_HCI_Read_Local_Supported_Features_Command()

View File

@@ -72,5 +72,6 @@ def test_parser_extensions():
# -----------------------------------------------------------------------------
test_parser()
test_parser_extensions()
if __name__ == '__main__':
test_parser()
test_parser_extensions()