Compare commits

...

53 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod d35643524e allow specifying the address type 2023-12-08 18:46:25 -08:00
Gilles Boccon-Gibod 085f163c92 add support for 2M phy 2023-12-08 10:14:38 -08:00
zxzxwu 88b4cbdf1a Merge pull request #364 from zxzxwu/iso
Fix ISO packet issues
2023-12-05 00:41:56 +08:00
Josh Wu d6afbc6f4e Fix ISO packet issues 2023-12-04 20:31:11 +08:00
Gilles Boccon-Gibod fc90de3e7b Merge pull request #351 from google/dependabot/cargo/rust/openssl-0.10.60
Bump openssl from 0.10.57 to 0.10.60 in /rust
2023-12-04 00:41:27 -08:00
Gilles Boccon-Gibod 847c2ef114 Merge pull request #362 from google/gbg/more-le-features-constants
a few more HCI constants from the spec
2023-12-04 00:38:02 -08:00
Gilles Boccon-Gibod a0bf0c1f4d Merge pull request #363 from google/gbg/android-remote-proxy-cli
android remote proxy cli
2023-12-04 00:37:49 -08:00
Gilles Boccon-Gibod 8400ff0802 shared usage printer 2023-12-04 00:37:28 -08:00
Gilles Boccon-Gibod 0ed6aa230b address PR comment 2023-12-04 00:32:04 -08:00
Gilles Boccon-Gibod 72d5360af9 keep projects compatible with Android Studio Hedgehog 2023-12-03 18:06:54 -08:00
Gilles Boccon-Gibod ac3961e763 add doc 2023-12-03 17:50:42 -08:00
Gilles Boccon-Gibod 843466c822 a few more constants from the spec 2023-12-03 17:16:25 -08:00
Gilles Boccon-Gibod 8385035400 add CLI support 2023-12-03 16:35:14 -08:00
zxzxwu 3adcc8be09 Merge pull request #360 from zxzxwu/hci
Remove # type: ignore[call-arg] in HCI_Command builders
2023-12-03 19:18:04 +08:00
zxzxwu c853d56302 Merge pull request #361 from zxzxwu/hci-bug
Fix typo
2023-12-03 04:22:59 +08:00
Josh Wu dc97be5b35 Fix typo 2023-12-02 23:42:21 +08:00
zxzxwu 73dbdfff9f Merge pull request #356 from zxzxwu/bap
Add Published Audio Capabilities Service
2023-12-02 23:34:57 +08:00
Josh Wu dff14e1258 Add Published Audio Capabilities Service 2023-12-02 23:16:37 +08:00
Josh Wu 10a3833893 Remove # type: ignore[call-arg] in HCI_Command builders 2023-12-02 19:18:54 +08:00
zxzxwu 247cb89332 Merge pull request #358 from zxzxwu/coding2
Add variable-length bytes field
2023-12-01 03:26:38 +08:00
Josh Wu 3fc71a0266 Add variable-length bytes field 2023-12-01 03:16:52 +08:00
zxzxwu 392dcc3a05 Merge pull request #357 from zxzxwu/coding
Refactor CodingFormat
2023-12-01 03:15:33 +08:00
Josh Wu f27015d1b7 Refactor CodingFormat
As CodingFormat is now used by HFP and LEA, and vendor specific codecs
are introduced, this object needs to provide more information.
2023-12-01 02:58:09 +08:00
zxzxwu 86a19b41aa Merge pull request #344 from zxzxwu/cis
CIS and SCO responder support
2023-11-30 21:00:55 +08:00
Gilles Boccon-Gibod 320164d476 Merge pull request #355 from google/gbg/fix-gatt-unsubscribe
fix #354 (gatt unsubscribe)
2023-11-29 22:28:57 -08:00
Josh Wu 40ae661ee5 More SCO support and warnings and typo fix 2023-11-30 12:59:43 +08:00
Josh Wu c5def93bb8 CIS and SCO responder support 2023-11-30 12:16:40 +08:00
zxzxwu a9c4c5833d Merge pull request #350 from zxzxwu/csip
Add Coordinated Set Identification Service(CSIS)
2023-11-30 12:15:56 +08:00
Gilles Boccon-Gibod 58c9c4f590 fix #354 2023-11-29 19:19:40 -08:00
zxzxwu 24524d88cb Merge pull request #342 from zxzxwu/typing
Typing helper
2023-11-30 00:21:44 +08:00
zxzxwu b8849ab311 Merge pull request #349 from zxzxwu/stack
Log track back in on_packet
2023-11-30 00:20:20 +08:00
Josh Wu f3cd8f8ed0 Typing helper 2023-11-29 21:24:27 +08:00
zxzxwu 2b26de3f3a Merge pull request #348 from zxzxwu/gattc
Typing GATT Client and Device Peer
2023-11-29 15:09:40 +08:00
Josh Wu 0149c4c212 Log track back in on_packet
Many errors are raised in on_packet() callbacks, but currently it only
provides a very brief error message.
2023-11-29 15:01:15 +08:00
Gilles Boccon-Gibod f2ed898784 Merge pull request #352 from google/gbg/more-gatt-uuids
add a few uuids
2023-11-28 22:44:40 -08:00
Josh Wu 464a476f9f Add CSIP 2023-11-29 14:09:31 +08:00
Gilles Boccon-Gibod e85d067fb5 add a few uuids 2023-11-28 20:02:00 -08:00
dependabot[bot] 7eb493990f Bump openssl from 0.10.57 to 0.10.60 in /rust
Bumps [openssl](https://github.com/sfackler/rust-openssl) from 0.10.57 to 0.10.60.
- [Release notes](https://github.com/sfackler/rust-openssl/releases)
- [Commits](https://github.com/sfackler/rust-openssl/compare/openssl-v0.10.57...openssl-v0.10.60)

---
updated-dependencies:
- dependency-name: openssl
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-28 21:43:18 +00:00
Josh Wu 04d5bf3afc Typing GATT Client and Device Peer 2023-11-28 21:57:57 +08:00
zxzxwu a13e193d3b Merge pull request #343 from zxzxwu/lea-gatt
Add LE Audio GATT services and characteristics definitions
2023-11-28 10:34:39 +08:00
Gilles Boccon-Gibod 28a1a5ebc2 Merge pull request #347 from akuker/main
Include transport.grpc_protobuf in the setup package.
2023-11-27 15:23:17 -08:00
Tony Kuker 6310dc777f Include transport.grpc_protobuf in the setup package. 2023-11-27 16:48:37 -06:00
Josh Wu 863de18877 Add LE Audio GATT definitions 2023-11-27 17:53:00 +08:00
zxzxwu f0e5cdee1a Merge pull request #339 from zxzxwu/enc
Refactor crypto and fix CTKD
2023-11-27 14:05:37 +08:00
zxzxwu 7bc7d0f5af Merge pull request #334 from zxzxwu/extadv
Add support for LE Extended Advertising
2023-11-27 14:01:31 +08:00
Josh Wu a65a215fd7 Provide IntFlag.name property fallback 2023-11-26 19:42:22 +08:00
Josh Wu 80d34a226d Slightly refactor and fix CTKD
It seems sample input data provided in the spec is big-endian (just
like other AES-CMAC-based functions), but all keys are in little-endian(
HCI standard), so they need to be reverse before and after applying
AES-CMAC.
2023-11-26 16:55:10 +08:00
Josh Wu a9628f73e3 Add support for Extended Advertising 2023-11-26 15:03:09 +08:00
Lucas Abel 9bf2e03354 device: set authenticated and sc state on AES encryption change 2023-11-23 06:39:55 +01:00
Gilles Boccon-Gibod 2900b93bb3 Merge pull request #120 from google/gbg/usb-cleanup
minor cleanup of the internals of the usb transport implementation
2023-11-22 17:18:23 -08:00
Gilles Boccon-Gibod 284cc8a321 Merge pull request #326 from google/gbg/android-benchmark-app
Android benchmarking app
2023-11-22 15:39:52 -08:00
Gilles Boccon-Gibod 9c7089c8ff terminate when unplugged 2023-11-19 11:36:38 -08:00
Gilles Boccon-Gibod a8ec1b0949 minor cleanup of the internals of the usb transport implementation 2023-11-15 17:26:21 -08:00
43 changed files with 3094 additions and 650 deletions
+6
View File
@@ -21,7 +21,9 @@
"cccds",
"cmac",
"CONNECTIONLESS",
"csip",
"csrcs",
"CVSD",
"datagram",
"DATALINK",
"delayreport",
@@ -29,6 +31,7 @@
"deregistration",
"dhkey",
"diversifier",
"endianness",
"Fitbit",
"GATTLINK",
"HANDSFREE",
@@ -38,12 +41,14 @@
"libc",
"libusb",
"MITM",
"MSBC",
"NDIS",
"netsim",
"NONBLOCK",
"NONCONN",
"OXIMETER",
"popleft",
"PRAND",
"protobuf",
"psms",
"pyee",
@@ -55,6 +60,7 @@
"SEID",
"seids",
"SERV",
"SIRK",
"ssrc",
"strerror",
"subband",
+82 -66
View File
@@ -21,6 +21,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import operator
@@ -29,11 +31,13 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.ec import (
generate_private_key,
ECDH,
EllipticCurvePrivateKey,
EllipticCurvePublicNumbers,
EllipticCurvePrivateNumbers,
SECP256R1,
)
from cryptography.hazmat.primitives import cmac
from typing import Tuple
# -----------------------------------------------------------------------------
@@ -46,16 +50,18 @@ logger = logging.getLogger(__name__)
# Classes
# -----------------------------------------------------------------------------
class EccKey:
def __init__(self, private_key):
def __init__(self, private_key: EllipticCurvePrivateKey) -> None:
self.private_key = private_key
@classmethod
def generate(cls):
def generate(cls) -> EccKey:
private_key = generate_private_key(SECP256R1())
return cls(private_key)
@classmethod
def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes):
def from_private_key_bytes(
cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes
) -> EccKey:
d = int.from_bytes(d_bytes, byteorder='big', signed=False)
x = int.from_bytes(x_bytes, byteorder='big', signed=False)
y = int.from_bytes(y_bytes, byteorder='big', signed=False)
@@ -65,7 +71,7 @@ class EccKey:
return cls(private_key)
@property
def x(self):
def x(self) -> bytes:
return (
self.private_key.public_key()
.public_numbers()
@@ -73,14 +79,14 @@ class EccKey:
)
@property
def y(self):
def y(self) -> bytes:
return (
self.private_key.public_key()
.public_numbers()
.y.to_bytes(32, byteorder='big')
)
def dh(self, public_key_x, public_key_y):
def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes:
x = int.from_bytes(public_key_x, byteorder='big', signed=False)
y = int.from_bytes(public_key_y, byteorder='big', signed=False)
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
@@ -93,14 +99,23 @@ class EccKey:
# Functions
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def xor(x, y):
def xor(x: bytes, y: bytes) -> bytes:
assert len(x) == len(y)
return bytes(map(operator.xor, x, y))
# -----------------------------------------------------------------------------
def r():
def reverse(input: bytes) -> bytes:
'''
Returns bytes of input in reversed endianness.
'''
return input[::-1]
# -----------------------------------------------------------------------------
def r() -> bytes:
'''
Generate 16 bytes of random data
'''
@@ -108,20 +123,20 @@ def r():
# -----------------------------------------------------------------------------
def e(key, data):
def e(key: bytes, data: bytes) -> bytes:
'''
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
'''
cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB())
cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB())
encryptor = cipher.encryptor()
return bytes(reversed(encryptor.update(bytes(reversed(data)))))
return reverse(encryptor.update(reverse(data)))
# -----------------------------------------------------------------------------
def ah(k, r): # pylint: disable=redefined-outer-name
def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
'''
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
'''
@@ -132,7 +147,16 @@ def ah(k, r): # pylint: disable=redefined-outer-name
# -----------------------------------------------------------------------------
def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name
def c1(
k: bytes,
r: bytes,
preq: bytes,
pres: bytes,
iat: int,
rat: int,
ia: bytes,
ra: bytes,
) -> bytes: # pylint: disable=redefined-outer-name
'''
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
LE Legacy Pairing
@@ -144,7 +168,7 @@ def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-n
# -----------------------------------------------------------------------------
def s1(k, r1, r2):
def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
Pairing
@@ -154,7 +178,7 @@ def s1(k, r1, r2):
# -----------------------------------------------------------------------------
def aes_cmac(m, k):
def aes_cmac(m: bytes, k: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
@@ -166,20 +190,16 @@ def aes_cmac(m, k):
# -----------------------------------------------------------------------------
def f4(u, v, x, z):
def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
Generation Function f4
'''
return bytes(
reversed(
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x)))
)
)
return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x)))
# -----------------------------------------------------------------------------
def f5(w, n1, n2, a1, a2):
def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
Function f5
@@ -187,87 +207,83 @@ def f5(w, n1, n2, a1, a2):
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
'''
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
t = aes_cmac(bytes(reversed(w)), salt)
t = aes_cmac(reverse(w), salt)
key_id = bytes([0x62, 0x74, 0x6C, 0x65])
return (
bytes(
reversed(
aes_cmac(
bytes([0])
+ key_id
+ bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(a1))
+ bytes(reversed(a2))
+ bytes([1, 0]),
t,
)
reverse(
aes_cmac(
bytes([0])
+ key_id
+ reverse(n1)
+ reverse(n2)
+ reverse(a1)
+ reverse(a2)
+ bytes([1, 0]),
t,
)
),
bytes(
reversed(
aes_cmac(
bytes([1])
+ key_id
+ bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(a1))
+ bytes(reversed(a2))
+ bytes([1, 0]),
t,
)
reverse(
aes_cmac(
bytes([1])
+ key_id
+ reverse(n1)
+ reverse(n2)
+ reverse(a1)
+ reverse(a2)
+ bytes([1, 0]),
t,
)
),
)
# -----------------------------------------------------------------------------
def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name
def f6(
w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes
) -> bytes: # pylint: disable=redefined-outer-name
'''
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
Generation Function f6
'''
return bytes(
reversed(
aes_cmac(
bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(r))
+ bytes(reversed(io_cap))
+ bytes(reversed(a1))
+ bytes(reversed(a2)),
bytes(reversed(w)),
)
return reverse(
aes_cmac(
reverse(n1)
+ reverse(n2)
+ reverse(r)
+ reverse(io_cap)
+ reverse(a1)
+ reverse(a2),
reverse(w),
)
)
# -----------------------------------------------------------------------------
def g2(u, v, x, y):
def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
Value Generation Function g2
'''
return int.from_bytes(
aes_cmac(
bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)),
bytes(reversed(x)),
reverse(u) + reverse(v) + reverse(y),
reverse(x),
)[-4:],
byteorder='big',
)
# -----------------------------------------------------------------------------
def h6(w, key_id):
def h6(w: bytes, key_id: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
'''
return aes_cmac(key_id, w)
return reverse(aes_cmac(key_id, reverse(w)))
# -----------------------------------------------------------------------------
def h7(salt, w):
def h7(salt: bytes, w: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
'''
return aes_cmac(w, salt)
return reverse(aes_cmac(reverse(w), salt))
+612 -87
View File
File diff suppressed because it is too large Load Diff
+116 -6
View File
@@ -93,20 +93,35 @@ GATT_RECONNECTION_CONFIGURATION_SERVICE = UUID.from_16_bits(0x1829, 'Reconne
GATT_INSULIN_DELIVERY_SERVICE = UUID.from_16_bits(0x183A, 'Insulin Delivery')
GATT_BINARY_SENSOR_SERVICE = UUID.from_16_bits(0x183B, 'Binary Sensor')
GATT_EMERGENCY_CONFIGURATION_SERVICE = UUID.from_16_bits(0x183C, 'Emergency Configuration')
GATT_AUTHORIZATION_CONTROL_SERVICE = UUID.from_16_bits(0x183D, 'Authorization Control')
GATT_PHYSICAL_ACTIVITY_MONITOR_SERVICE = UUID.from_16_bits(0x183E, 'Physical Activity Monitor')
GATT_ELAPSED_TIME_SERVICE = UUID.from_16_bits(0x183F, 'Elapsed Time')
GATT_GENERIC_HEALTH_SENSOR_SERVICE = UUID.from_16_bits(0x1840, 'Generic Health Sensor')
GATT_AUDIO_INPUT_CONTROL_SERVICE = UUID.from_16_bits(0x1843, 'Audio Input Control')
GATT_VOLUME_CONTROL_SERVICE = UUID.from_16_bits(0x1844, 'Volume Control')
GATT_VOLUME_OFFSET_CONTROL_SERVICE = UUID.from_16_bits(0x1845, 'Volume Offset Control')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification Service')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification')
GATT_DEVICE_TIME_SERVICE = UUID.from_16_bits(0x1847, 'Device Time')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control Service')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control Service')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control')
GATT_CONSTANT_TONE_EXTENSION_SERVICE = UUID.from_16_bits(0x184A, 'Constant Tone Extension')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer Service')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer Service')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer')
GATT_MICROPHONE_CONTROL_SERVICE = UUID.from_16_bits(0x184D, 'Microphone Control')
GATT_AUDIO_STREAM_CONTROL_SERVICE = UUID.from_16_bits(0x184E, 'Audio Stream Control')
GATT_BROADCAST_AUDIO_SCAN_SERVICE = UUID.from_16_bits(0x184F, 'Broadcast Audio Scan')
GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE = UUID.from_16_bits(0x1850, 'Published Audio Capabilities')
GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1851, 'Basic Audio Announcement')
GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1852, 'Broadcast Audio Announcement')
GATT_COMMON_AUDIO_SERVICE = UUID.from_16_bits(0x1853, 'Common Audio')
GATT_HEARING_ACCESS_SERVICE = UUID.from_16_bits(0x1854, 'Hearing Access')
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE = UUID.from_16_bits(0x1855, 'Telephony and Media Audio')
GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1856, 'Public Broadcast Announcement')
GATT_ELECTRONIC_SHELF_LABEL_SERVICE = UUID.from_16_bits(0X1857, 'Electronic Shelf Label')
GATT_GAMING_AUDIO_SERVICE = UUID.from_16_bits(0x1858, 'Gaming Audio')
GATT_MESH_PROXY_SOLICITATION_SERVICE = UUID.from_16_bits(0x1859, 'Mesh Audio Solicitation')
# Types
# Attribute Types
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2800, 'Primary Service')
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2801, 'Secondary Service')
GATT_INCLUDE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2802, 'Include')
@@ -129,6 +144,8 @@ GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C,
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
GATT_OBSERVATION_SCHEDULE_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Observation Schedule')
GATT_VALID_RANGE_AND_ACCURACY_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Valid Range And Accuracy')
# Device Information Service
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
@@ -156,6 +173,96 @@ GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Telephony And Media Audio Service (TMAS)
GATT_TMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2B51, 'TMAP Role')
# Audio Input Control Service (AICS)
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B77, 'Audio Input State')
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC = UUID.from_16_bits(0x2B78, 'Gain Settings Attribute')
GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC = UUID.from_16_bits(0x2B79, 'Audio Input Type')
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC = UUID.from_16_bits(0x2B7A, 'Audio Input Status')
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7B, 'Audio Input Control Point')
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B7C, 'Audio Input Description')
# Volume Control Service (VCS)
GATT_VOLUME_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B7D, 'Volume State')
GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7E, 'Volume Control Point')
GATT_VOLUME_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2B7F, 'Volume Flags')
# Volume Offset Control Service (VOCS)
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B80, 'Volume Offset State')
GATT_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2B81, 'Audio Location')
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B82, 'Volume Offset Control Point')
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B83, 'Audio Output Description')
# Coordinated Set Identification Service (CSIS)
GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC = UUID.from_16_bits(0x2B84, 'Set Identity Resolving Key')
GATT_COORDINATED_SET_SIZE_CHARACTERISTIC = UUID.from_16_bits(0x2B85, 'Coordinated Set Size')
GATT_SET_MEMBER_LOCK_CHARACTERISTIC = UUID.from_16_bits(0x2B86, 'Set Member Lock')
GATT_SET_MEMBER_RANK_CHARACTERISTIC = UUID.from_16_bits(0x2B87, 'Set Member Rank')
# Media Control Service (MCS)
GATT_MEDIA_PLAYER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2B93, 'Media Player Name')
GATT_MEDIA_PLAYER_ICON_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B94, 'Media Player Icon Object ID')
GATT_MEDIA_PLAYER_ICON_URL_CHARACTERISTIC = UUID.from_16_bits(0x2B95, 'Media Player Icon URL')
GATT_TRACK_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2B96, 'Track Changed')
GATT_TRACK_TITLE_CHARACTERISTIC = UUID.from_16_bits(0x2B97, 'Track Title')
GATT_TRACK_DURATION_CHARACTERISTIC = UUID.from_16_bits(0x2B98, 'Track Duration')
GATT_TRACK_POSITION_CHARACTERISTIC = UUID.from_16_bits(0x2B99, 'Track Position')
GATT_PLAYBACK_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9A, 'Playback Speed')
GATT_SEEKING_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9B, 'Seeking Speed')
GATT_CURRENT_TRACK_SEGMENTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9C, 'Current Track Segments Object ID')
GATT_CURRENT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9D, 'Current Track Object ID')
GATT_NEXT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9E, 'Next Track Object ID')
GATT_PARENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9F, 'Parent Group Object ID')
GATT_CURRENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA0, 'Current Group Object ID')
GATT_PLAYING_ORDER_CHARACTERISTIC = UUID.from_16_bits(0x2BA1, 'Playing Order')
GATT_PLAYING_ORDERS_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA2, 'Playing Orders Supported')
GATT_MEDIA_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BA3, 'Media State')
GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA4, 'Media Control Point')
GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA5, 'Media Control Point Opcodes Supported')
GATT_SEARCH_RESULTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA6, 'Search Results Object ID')
GATT_SEARCH_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA7, 'Search Control Point')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Content Control Id')
# Telephone Bearer Service (TBS)
GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BB4, 'Bearer Provider Name')
GATT_BEARER_UCI_CHARACTERISTIC = UUID.from_16_bits(0x2BB5, 'Bearer UCI')
GATT_BEARER_TECHNOLOGY_CHARACTERISTIC = UUID.from_16_bits(0x2BB6, 'Bearer Technology')
GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2BB7, 'Bearer URI Schemes Supported List')
GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength')
GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer Signal Strength Reporting Interval')
GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Bearer List Current Calls')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBB, 'Content Control ID')
GATT_STATUS_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2BBC, 'Status Flags')
GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC = UUID.from_16_bits(0x2BBD, 'Incoming Call Target Bearer URI')
GATT_CALL_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BBE, 'Call State')
GATT_CALL_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BBF, 'Call Control Point')
GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC = UUID.from_16_bits(0x2BC0, 'Call Control Point Optional Opcodes')
GATT_TERMINATION_REASON_CHARACTERISTIC = UUID.from_16_bits(0x2BC1, 'Termination Reason')
GATT_INCOMING_CALL_CHARACTERISTIC = UUID.from_16_bits(0x2BC2, 'Incoming Call')
GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Call Friendly Name')
# Microphone Control Service (MICS)
GATT_MUTE_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Mute')
# Audio Stream Control Service (ASCS)
GATT_SINK_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC4, 'Sink ASE')
GATT_SOURCE_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC5, 'Source ASE')
GATT_ASE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC6, 'ASE Control Point')
# Broadcast Audio Scan Service (BASS)
GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC7, 'Broadcast Audio Scan Control Point')
GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BC8, 'Broadcast Receive State')
# Published Audio Capabilities Service (PACS)
GATT_SINK_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BC9, 'Sink PAC')
GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCA, 'Sink Audio Location')
GATT_SOURCE_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BCB, 'Source PAC')
GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Source Audio Location')
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
# ASHA Service
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
@@ -177,6 +284,9 @@ GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bi
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features')
GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash')
GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
# fmt: on
# pylint: enable=line-too-long
+56 -20
View File
@@ -38,6 +38,7 @@ from typing import (
Any,
Iterable,
Type,
Set,
TYPE_CHECKING,
)
@@ -128,7 +129,7 @@ class ServiceProxy(AttributeProxy):
included_services: List[ServiceProxy]
@staticmethod
def from_client(service_class, client, service_uuid):
def from_client(service_class, client: Client, service_uuid: UUID):
# The service and its characteristics are considered to have already been
# discovered
services = client.get_services_by_uuid(service_uuid)
@@ -206,11 +207,11 @@ class CharacteristicProxy(AttributeProxy):
return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None):
async def unsubscribe(self, subscriber=None, force=False):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber)
return await self.client.unsubscribe(self, subscriber, force)
def __str__(self) -> str:
return (
@@ -246,8 +247,12 @@ class ProfileServiceProxy:
class Client:
services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]]
notification_subscribers: Dict[int, Callable[[bytes], Any]]
indication_subscribers: Dict[int, Callable[[bytes], Any]]
notification_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
indication_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
pending_response: Optional[asyncio.futures.Future[ATT_PDU]]
pending_request: Optional[ATT_PDU]
@@ -257,10 +262,8 @@ class Client:
self.request_semaphore = asyncio.Semaphore(1)
self.pending_request = None
self.pending_response = None
self.notification_subscribers = (
{}
) # Notification subscribers, by attribute handle
self.indication_subscribers = {} # Indication subscribers, by attribute handle
self.notification_subscribers = {} # Subscriber set, by attribute handle
self.indication_subscribers = {} # Subscriber set, by attribute handle
self.services = []
self.cached_values = {}
@@ -682,8 +685,8 @@ class Client:
async def discover_descriptors(
self,
characteristic: Optional[CharacteristicProxy] = None,
start_handle=None,
end_handle=None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
) -> List[DescriptorProxy]:
'''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
@@ -789,7 +792,12 @@ class Client:
return attributes
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
async def subscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
# If we haven't already discovered the descriptors for this characteristic,
# do it now
if not characteristic.descriptors_discovered:
@@ -826,6 +834,7 @@ class Client:
subscriber_set = subscribers.setdefault(characteristic.handle, set())
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the
# characteristic emitting an 'update' event when a notification or indication
# is received
@@ -833,7 +842,18 @@ class Client:
await self.write_value(cccd, struct.pack('<H', bits), with_response=True)
async def unsubscribe(self, characteristic, subscriber=None):
async def unsubscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
force: bool = False,
) -> None:
'''
Unsubscribe from a characteristic.
If `force` is True, this will write zeros to the CCCD when there are no
subscribers left, even if there were already no registered subscribers.
'''
# If we haven't already discovered the descriptors for this characteristic,
# do it now
if not characteristic.descriptors_discovered:
@@ -847,31 +867,45 @@ class Client:
logger.warning('unsubscribing from characteristic with no CCCD descriptor')
return
# Check if the characteristic has subscribers
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
if not force:
return
# Remove the subscriber(s)
if subscriber is not None:
# Remove matching subscriber from subscriber sets
for subscriber_set in (
self.notification_subscribers,
self.indication_subscribers,
):
subscribers = subscriber_set.get(characteristic.handle, [])
if subscriber in subscribers:
if (
subscribers := subscriber_set.get(characteristic.handle)
) and subscriber in subscribers:
subscribers.remove(subscriber)
# Cleanup if we removed the last one
if not subscribers:
del subscriber_set[characteristic.handle]
else:
# Remove all subscribers for this attribute from the sets!
# Remove all subscribers for this attribute from the sets
self.notification_subscribers.pop(characteristic.handle, None)
self.indication_subscribers.pop(characteristic.handle, None)
if not self.notification_subscribers and not self.indication_subscribers:
# Update the CCCD
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
# No more subscribers left
await self.write_value(cccd, b'\x00\x00', with_response=True)
async def read_value(
self, attribute: Union[int, AttributeProxy], no_long_read: bool = False
) -> Any:
) -> bytes:
'''
See Vol 3, Part G - 4.8.1 Read Characteristic Value
@@ -1067,7 +1101,7 @@ class Client:
def on_att_handle_value_notification(self, notification):
# Call all subscribers
subscribers = self.notification_subscribers.get(
notification.attribute_handle, []
notification.attribute_handle, set()
)
if not subscribers:
logger.warning('!!! received notification with no subscriber')
@@ -1081,7 +1115,9 @@ class Client:
def on_att_handle_value_indication(self, indication):
# Call all subscribers
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
subscribers = self.indication_subscribers.get(
indication.attribute_handle, set()
)
if not subscribers:
logger.warning('!!! received indication with no subscriber')
+187 -143
View File
@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import collections
import dataclasses
import enum
import functools
import logging
@@ -560,6 +561,12 @@ HCI_LE_TRANSMITTER_TEST_V4_COMMAND = hci_c
HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_command_op_code(0x08, 0x007C)
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x007F)
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND = hci_command_op_code(0x08, 0x0082)
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND = hci_command_op_code(0x08, 0x0083)
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND = hci_command_op_code(0x08, 0x0084)
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND = hci_command_op_code(0x08, 0x0085)
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x0086)
# HCI Error Codes
@@ -1316,56 +1323,72 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
(
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND,
HCI_LE_SUBRATE_REQUEST_COMMAND,
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND,
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND
),
# Octet 47
(
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
None,
None,
None,
None,
None
)
)
# LE Supported Features
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44
HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items()
@@ -1382,6 +1405,45 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = {
STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)}
class CodecID(enum.IntEnum):
# fmt: off
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
LINEAR_PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
VENDOR_SPECIFIC = 0xFF
@dataclasses.dataclass(frozen=True)
class CodingFormat:
codec_id: CodecID
company_id: int = 0
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
return offset + 5, cls(
codec_id=CodecID(codec_id),
company_id=company_id,
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@staticmethod
@@ -1477,6 +1539,12 @@ class HCI_Object:
# The rest of the bytes
field_value = data[offset:]
return (field_value, len(field_value))
if field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_length = data[offset]
offset += 1
field_value = data[offset : offset + field_length]
return (field_value, field_length + 1)
if field_type == 1:
# 8-bit unsigned
return (data[offset], 1)
@@ -1581,6 +1649,11 @@ class HCI_Object:
raise ValueError('value too large for *-typed field')
else:
field_bytes = bytes(field_value)
elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_value)
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes'
):
@@ -1888,6 +1961,7 @@ Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS
Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS)
Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS)
# -----------------------------------------------------------------------------
class OwnAddressType:
PUBLIC = 0
@@ -1934,6 +2008,9 @@ class HCI_Packet:
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet)
def __init__(self, name):
@@ -1966,6 +2043,7 @@ class HCI_Command(HCI_Packet):
hci_packet_type = HCI_COMMAND_PACKET
command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
op_code: int
@staticmethod
def command(fields=(), return_parameters_fields=()):
@@ -2051,7 +2129,11 @@ class HCI_Command(HCI_Packet):
return_parameters.fields = cls.return_parameters_fields
return return_parameters
def __init__(self, op_code, parameters=None, **kwargs):
def __init__(self, op_code=-1, parameters=None, **kwargs):
# Since the legacy implementation relies on an __init__ injector, typing always
# complains that positional argument op_code is not passed, so here sets a
# default value to allow building derived HCI_Command without op_code.
assert op_code != -1
super().__init__(HCI_Command.command_name(op_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
HCI_Object.init_from_fields(self, fields, kwargs)
@@ -2445,14 +2527,14 @@ class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
('connection_handle', 2),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -2473,22 +2555,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command
'''
class CodingFormat(enum.IntEnum):
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
def to_bytes(self):
return self.value.to_bytes(5, 'little')
def __bytes__(self):
return self.to_bytes()
class PcmDataFormat(enum.IntEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
@@ -2525,14 +2591,14 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
('bd_addr', Address.parse_address),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -3829,8 +3895,10 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'advertising_event_properties',
{
'size': 2,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(
x
'mapper': lambda x: str(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
x
)
),
},
),
@@ -3840,8 +3908,8 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'primary_advertising_channel_map',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(
x
'mapper': lambda x: str(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(x)
),
},
),
@@ -3863,38 +3931,33 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''
CONNECTABLE_ADVERTISING = 0
SCANNABLE_ADVERTISING = 1
DIRECTED_ADVERTISING = 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
USE_LEGACY_ADVERTISING_PDUS = 4
ANONYMOUS_ADVERTISING = 5
INCLUDE_TX_POWER = 6
class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
DIRECTED_ADVERTISING = 1 << 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
USE_LEGACY_ADVERTISING_PDUS = 1 << 4
ANONYMOUS_ADVERTISING = 1 << 5
INCLUDE_TX_POWER = 1 << 6
ADVERTISING_PROPERTIES_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
'USE_LEGACY_ADVERTISING_PDUS',
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER',
)
def __str__(self) -> str:
return '|'.join(
flag.name
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
if self.value & flag.value and flag.name is not None
)
CHANNEL_37 = 0
CHANNEL_38 = 1
CHANNEL_39 = 2
class ChannelMap(enum.IntFlag):
CHANNEL_37 = 1 << 0
CHANNEL_38 = 1 << 1
CHANNEL_39 = 1 << 2
CHANNEL_NAMES = ('37', '38', '39')
@classmethod
def advertising_properties_string(cls, properties):
# pylint: disable=line-too-long
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
def __str__(self) -> str:
return '|'.join(
flag.name
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
if self.value & flag.value and flag.name is not None
)
# -----------------------------------------------------------------------------
@@ -3906,9 +3969,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
),
).name,
},
),
('fragment_preference', 1),
@@ -3926,23 +3989,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
class Operation(enum.IntEnum):
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
# -----------------------------------------------------------------------------
@@ -3954,9 +4006,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
),
).name,
},
),
('fragment_preference', 1),
@@ -3974,22 +4026,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -4481,7 +4517,10 @@ class HCI_LE_Accept_CIS_Request_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('connection_handle', 2)],
fields=[
('connection_handle', 2),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
)
class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
@@ -4489,6 +4528,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@@ -4497,9 +4537,9 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
('connection_handle', 2),
('data_path_direction', 1),
('data_path_id', 1),
('codec_id', 5),
('codec_id', CodingFormat.parse_from_bytes),
('controller_delay', 3),
('codec_configuration', '*'),
('codec_configuration', 'v'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
@@ -4514,9 +4554,9 @@ class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command):
connection_handle: int
data_path_direction: int
data_path_id: int
codec_id: int
codec_id: CodingFormat
controller_delay: int
codec_configuration: int
codec_configuration: bytes
# -----------------------------------------------------------------------------
@@ -5326,6 +5366,10 @@ class HCI_Disconnection_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.5 Disconnection Complete Event
'''
status: int
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)])
@@ -6079,7 +6123,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag:
if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, _ = struct.unpack_from('<I', packet, pos)
time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4
if should_include_sdu_info:
@@ -6146,7 +6190,7 @@ class HCI_IsoDataPacket(HCI_Packet):
self.packet_sequence_number,
self.iso_sdu_length | self.packet_status_flag << 14,
]
return struct.pack(fmt, args) + self.iso_sdu_fragment
return struct.pack(fmt, *args) + self.iso_sdu_fragment
def __str__(self) -> str:
return (
+67 -42
View File
@@ -15,30 +15,39 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Callable, MutableMapping
from typing import cast, Any
import logging
from .colors import color
from .att import ATT_CID, ATT_PDU
from .smp import SMP_CID, SMP_Command
from .core import name_or_number
from .l2cap import (
from bumble import avdtp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
from bumble.core import name_or_number
from bumble.l2cap import (
L2CAP_PDU,
L2CAP_CONNECTION_REQUEST,
L2CAP_CONNECTION_RESPONSE,
L2CAP_SIGNALING_CID,
L2CAP_LE_SIGNALING_CID,
L2CAP_Control_Frame,
L2CAP_Connection_Request,
L2CAP_Connection_Response,
)
from .hci import (
from bumble.hci import (
HCI_EVENT_PACKET,
HCI_ACL_DATA_PACKET,
HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AclDataPacketAssembler,
HCI_Packet,
HCI_Event,
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from .rfcomm import RFCOMM_Frame, RFCOMM_PSM
from .sdp import SDP_PDU, SDP_PSM
from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
# -----------------------------------------------------------------------------
# Logging
@@ -50,23 +59,25 @@ logger = logging.getLogger(__name__)
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
AVDTP_PSM: 'AVDTP'
# TODO: add more PSM values
avdtp.AVDTP_PSM: 'AVDTP',
}
# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
def __init__(self, analyzer):
psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None # ACL stream in the other direction
# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu):
def on_acl_pdu(self, pdu: bytes) -> None:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
if l2cap_pdu.cid == ATT_CID:
@@ -81,26 +92,30 @@ class PacketTracer:
# Check if this signals a new channel
if control_frame.code == L2CAP_CONNECTION_REQUEST:
self.psms[control_frame.source_cid] = control_frame.psm
connection_request = cast(L2CAP_Connection_Request, control_frame)
self.psms[connection_request.source_cid] = connection_request.psm
elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
connection_response = cast(L2CAP_Connection_Response, control_frame)
if (
control_frame.result
connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer:
if psm := self.peer.psms.get(control_frame.source_cid):
if psm := self.peer.psms.get(
connection_response.source_cid
):
# Found a pending connection
self.psms[control_frame.destination_cid] = psm
self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == AVDTP_PSM:
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
control_frame.source_cid
] = AVDTP_MessageAssembler(self.on_avdtp_message)
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
control_frame.destination_cid
] = AVDTP_MessageAssembler(
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)
@@ -113,7 +128,7 @@ class PacketTracer:
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == AVDTP_PSM:
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
@@ -130,22 +145,26 @@ class PacketTracer:
else:
self.analyzer.emit(l2cap_pdu)
def on_avdtp_message(self, transaction_label, message):
def on_avdtp_message(
self, transaction_label: int, message: avdtp.Message
) -> None:
self.analyzer.emit(
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)
def feed_packet(self, packet):
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
self.packet_assembler.feed_packet(packet)
class Analyzer:
def __init__(self, label, emit_message):
acl_streams: MutableMapping[int, PacketTracer.AclStream]
peer: PacketTracer.Analyzer
def __init__(self, label: str, emit_message: Callable[..., None]) -> None:
self.label = label
self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle
self.peer = None # Analyzer in the other direction
def start_acl_stream(self, connection_handle):
def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
logger.info(
f'[{self.label}] +++ Creating ACL stream for connection '
f'0x{connection_handle:04X}'
@@ -160,7 +179,7 @@ class PacketTracer:
return stream
def end_acl_stream(self, connection_handle):
def end_acl_stream(self, connection_handle: int) -> None:
if connection_handle in self.acl_streams:
logger.info(
f'[{self.label}] --- Removing ACL stream for connection '
@@ -171,23 +190,29 @@ class PacketTracer:
# Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle)
def on_packet(self, packet):
def on_packet(self, packet: HCI_Packet) -> None:
self.emit(packet)
if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
acl_packet = cast(HCI_AclDataPacket, packet)
# Look for an existing stream for this handle, create one if it is the
# first ACL packet for that connection handle
if (stream := self.acl_streams.get(packet.connection_handle)) is None:
stream = self.start_acl_stream(packet.connection_handle)
stream.feed_packet(packet)
if (
stream := self.acl_streams.get(acl_packet.connection_handle)
) is None:
stream = self.start_acl_stream(acl_packet.connection_handle)
stream.feed_packet(acl_packet)
elif packet.hci_packet_type == HCI_EVENT_PACKET:
if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(packet.connection_handle)
event_packet = cast(HCI_Event, packet)
if event_packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(
cast(HCI_Disconnection_Complete_Event, packet).connection_handle
)
def emit(self, message):
def emit(self, message: Any) -> None:
self.emit_message(f'[{self.label}] {message}')
def trace(self, packet, direction=0):
def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
if direction == 0:
self.host_to_controller_analyzer.on_packet(packet)
else:
@@ -195,10 +220,10 @@ class PacketTracer:
def __init__(
self,
host_to_controller_label=color('HOST->CONTROLLER', 'blue'),
controller_to_host_label=color('CONTROLLER->HOST', 'cyan'),
emit_message=logger.info,
):
host_to_controller_label: str = color('HOST->CONTROLLER', 'blue'),
controller_to_host_label: str = color('CONTROLLER->HOST', 'cyan'),
emit_message: Callable[..., None] = logger.info,
) -> None:
self.host_to_controller_analyzer = PacketTracer.Analyzer(
host_to_controller_label, emit_message
)
+37 -27
View File
@@ -22,7 +22,7 @@ import dataclasses
import enum
import traceback
import warnings
from typing import Dict, List, Union, Set, TYPE_CHECKING
from typing import Dict, List, Union, Set, Any, TYPE_CHECKING
from . import at
from . import rfcomm
@@ -35,7 +35,11 @@ from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hci import (
HCI_Enhanced_Setup_Synchronous_Connection_Command,
CodingFormat,
CodecID,
)
from bumble.sdp import (
DataElement,
ServiceAttribute,
@@ -66,6 +70,7 @@ class HfpProtocolError(ProtocolError):
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
@@ -842,19 +847,15 @@ class DefaultCodecParameters(enum.IntEnum):
@dataclasses.dataclass
class EscoParameters:
# Codec specific
transmit_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat
receive_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat
transmit_coding_format: CodingFormat
receive_coding_format: CodingFormat
packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
max_latency: int
# Common
input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16
output_coded_data_size: int = 16
input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
@@ -880,26 +881,31 @@ class EscoParameters:
transmit_codec_frame_size: int = 60
receive_codec_frame_size: int = 60
def asdict(self) -> Dict[str, Any]:
# dataclasses.asdict() will recursively deep-copy the entire object,
# which is expensive and breaks CodingFormat object, so let it simply copy here.
return self.__dict__
_ESCO_PARAMETERS_CVSD_D0 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_D1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -912,8 +918,8 @@ _ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -925,8 +931,8 @@ _ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000A,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -938,8 +944,8 @@ _ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000C,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -951,8 +957,8 @@ _ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
)
_ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x0008,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -960,12 +966,14 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
_ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x000D,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -974,10 +982,12 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
ESCO_PERAMETERS = {
ESCO_PARAMETERS = {
DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,
+27 -3
View File
@@ -32,8 +32,8 @@ from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
@@ -52,6 +52,7 @@ from .hci import (
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
@@ -75,7 +76,6 @@ from .core import (
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
InvalidStateError,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
@@ -243,7 +243,7 @@ class Host(AbortableEventEmitter):
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFF00000000000')
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -495,6 +495,8 @@ class Host(AbortableEventEmitter):
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
@@ -515,6 +517,10 @@ class Host(AbortableEventEmitter):
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -715,6 +721,24 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
event.acl_connection_handle,
event.cis_connection_handle,
event.cig_id,
event.cis_id,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')
+9 -1
View File
@@ -391,6 +391,9 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
'''
psm: int
source_cid: int
@staticmethod
def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
psm_length = 2
@@ -432,6 +435,11 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
'''
source_cid: int
destination_cid: int
status: int
result: int
CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
@@ -1918,7 +1926,7 @@ class ChannelManager:
supervision_timeout=request.timeout,
min_ce_length=0,
max_ce_length=0,
) # type: ignore[call-arg]
)
)
else:
self.send_control_frame(
+496
View File
@@ -0,0 +1,496 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Sequence
import dataclasses
import enum
import struct
import functools
from typing import Optional, List, Union
from bumble import hci
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class AudioLocation(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location'''
# fmt: off
NOT_ALLOWED = 0x00000000
FRONT_LEFT = 0x00000001
FRONT_RIGHT = 0x00000002
FRONT_CENTER = 0x00000004
LOW_FREQUENCY_EFFECTS_1 = 0x00000008
BACK_LEFT = 0x00000010
BACK_RIGHT = 0x00000020
FRONT_LEFT_OF_CENTER = 0x00000040
FRONT_RIGHT_OF_CENTER = 0x00000080
BACK_CENTER = 0x00000100
LOW_FREQUENCY_EFFECTS_2 = 0x00000200
SIDE_LEFT = 0x00000400
SIDE_RIGHT = 0x00000800
TOP_FRONT_LEFT = 0x00001000
TOP_FRONT_RIGHT = 0x00002000
TOP_FRONT_CENTER = 0x00004000
TOP_CENTER = 0x00008000
TOP_BACK_LEFT = 0x00010000
TOP_BACK_RIGHT = 0x00020000
TOP_SIDE_LEFT = 0x00040000
TOP_SIDE_RIGHT = 0x00080000
TOP_BACK_CENTER = 0x00100000
BOTTOM_FRONT_CENTER = 0x00200000
BOTTOM_FRONT_LEFT = 0x00400000
BOTTOM_FRONT_RIGHT = 0x00800000
FRONT_LEFT_WIDE = 0x01000000
FRONT_RIGHT_WIDE = 0x02000000
LEFT_SURROUND = 0x04000000
RIGHT_SURROUND = 0x08000000
class AudioInputType(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
# fmt: off
UNSPECIFIED = 0x00
BLUETOOTH = 0x01
MICROPHONE = 0x02
ANALOG = 0x03
DIGITAL = 0x04
RADIO = 0x05
STREAMING = 0x06
AMBIENT = 0x07
class ContextType(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type'''
# fmt: off
PROHIBITED = 0x0000
CONVERSATIONAL = 0x0002
MEDIA = 0x0004
GAME = 0x0008
INSTRUCTIONAL = 0x0010
VOICE_ASSISTANTS = 0x0020
LIVE = 0x0040
SOUND_EFFECTS = 0x0080
NOTIFICATIONS = 0x0100
RINGTONE = 0x0200
ALERTS = 0x0400
EMERGENCY_ALARM = 0x0800
class SamplingFrequency(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
# fmt: off
FREQ_8000 = 0x01
FREQ_11025 = 0x02
FREQ_16000 = 0x03
FREQ_22050 = 0x04
FREQ_24000 = 0x05
FREQ_32000 = 0x06
FREQ_44100 = 0x07
FREQ_48000 = 0x08
FREQ_88200 = 0x09
FREQ_96000 = 0x0A
FREQ_176400 = 0x0B
FREQ_192000 = 0x0C
FREQ_384000 = 0x0D
# fmt: on
@classmethod
def from_hz(cls, frequency: int) -> SamplingFrequency:
return {
8000: SamplingFrequency.FREQ_8000,
11025: SamplingFrequency.FREQ_11025,
16000: SamplingFrequency.FREQ_16000,
22050: SamplingFrequency.FREQ_22050,
24000: SamplingFrequency.FREQ_24000,
32000: SamplingFrequency.FREQ_32000,
44100: SamplingFrequency.FREQ_44100,
48000: SamplingFrequency.FREQ_48000,
88200: SamplingFrequency.FREQ_88200,
96000: SamplingFrequency.FREQ_96000,
176400: SamplingFrequency.FREQ_176400,
192000: SamplingFrequency.FREQ_192000,
384000: SamplingFrequency.FREQ_384000,
}[frequency]
@property
def hz(self) -> int:
return {
SamplingFrequency.FREQ_8000: 8000,
SamplingFrequency.FREQ_11025: 11025,
SamplingFrequency.FREQ_16000: 16000,
SamplingFrequency.FREQ_22050: 22050,
SamplingFrequency.FREQ_24000: 24000,
SamplingFrequency.FREQ_32000: 32000,
SamplingFrequency.FREQ_44100: 44100,
SamplingFrequency.FREQ_48000: 48000,
SamplingFrequency.FREQ_88200: 88200,
SamplingFrequency.FREQ_96000: 96000,
SamplingFrequency.FREQ_176400: 176400,
SamplingFrequency.FREQ_192000: 192000,
SamplingFrequency.FREQ_384000: 384000,
}[self]
class SupportedSamplingFrequency(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency'''
# fmt: off
FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1)
FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1)
FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1)
FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1)
FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1)
FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1)
FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1)
FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1)
FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1)
FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1)
FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1)
FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1)
FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1)
# fmt: on
@classmethod
def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency:
MAPPING = {
8000: SupportedSamplingFrequency.FREQ_8000,
11025: SupportedSamplingFrequency.FREQ_11025,
16000: SupportedSamplingFrequency.FREQ_16000,
22050: SupportedSamplingFrequency.FREQ_22050,
24000: SupportedSamplingFrequency.FREQ_24000,
32000: SupportedSamplingFrequency.FREQ_32000,
44100: SupportedSamplingFrequency.FREQ_44100,
48000: SupportedSamplingFrequency.FREQ_48000,
88200: SupportedSamplingFrequency.FREQ_88200,
96000: SupportedSamplingFrequency.FREQ_96000,
176400: SupportedSamplingFrequency.FREQ_176400,
192000: SupportedSamplingFrequency.FREQ_192000,
384000: SupportedSamplingFrequency.FREQ_384000,
}
return functools.reduce(
lambda x, y: x | MAPPING[y],
frequencies,
cls(0),
)
class FrameDuration(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration'''
# fmt: off
DURATION_7500_US = 0x00
DURATION_10000_US = 0x01
class SupportedFrameDuration(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
# fmt: off
DURATION_7500_US_SUPPORTED = 0b0001
DURATION_10000_US_SUPPORTED = 0b0010
DURATION_7500_US_PREFERRED = 0b0001
DURATION_10000_US_PREFERRED = 0b0010
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def bits_to_channel_counts(data: int) -> List[int]:
pos = 0
counts = []
while data != 0:
# Bit 0 = count 1
# Bit 1 = count 2, and so on
pos += 1
if data & 1:
counts.append(pos)
data >>= 1
return counts
def channel_counts_to_bits(counts: Sequence[int]) -> int:
return sum(set([1 << (count - 1) for count in counts]))
# -----------------------------------------------------------------------------
# Structures
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CodecSpecificCapabilities:
'''See:
* Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures
* Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements
'''
class Type(enum.IntEnum):
# fmt: off
SAMPLING_FREQUENCY = 0x01
FRAME_DURATION = 0x02
AUDIO_CHANNEL_COUNT = 0x03
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
supported_sampling_frequencies: SupportedSamplingFrequency
supported_frame_durations: SupportedFrameDuration
supported_audio_channel_counts: Sequence[int]
min_octets_per_codec_frame: int
max_octets_per_codec_frame: int
supported_max_codec_frames_per_sdu: int
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
offset = 0
# Allowed default values.
supported_audio_channel_counts = [1]
supported_max_codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
value = int.from_bytes(data[offset : offset + length - 1], 'little')
offset += length - 1
if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY:
supported_sampling_frequencies = SupportedSamplingFrequency(value)
elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
supported_frame_durations = SupportedFrameDuration(value)
elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
supported_audio_channel_counts = bits_to_channel_counts(value)
elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
min_octets_per_sample = value & 0xFFFF
max_octets_per_sample = value >> 16
elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU:
supported_max_codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
return CodecSpecificCapabilities(
supported_sampling_frequencies=supported_sampling_frequencies,
supported_frame_durations=supported_frame_durations,
supported_audio_channel_counts=supported_audio_channel_counts,
min_octets_per_codec_frame=min_octets_per_sample,
max_octets_per_codec_frame=max_octets_per_sample,
supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBHBBBBBBBBHHBBB',
3,
CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY,
self.supported_sampling_frequencies,
2,
CodecSpecificCapabilities.Type.FRAME_DURATION,
self.supported_frame_durations,
2,
CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
channel_counts_to_bits(self.supported_audio_channel_counts),
5,
CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
self.min_octets_per_codec_frame,
self.max_octets_per_codec_frame,
2,
CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU,
self.supported_max_codec_frames_per_sdu,
)
@dataclasses.dataclass
class PacRecord:
coding_format: hci.CodingFormat
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
# TODO: Parse Metadata
metadata: bytes = b''
@classmethod
def from_bytes(cls, data: bytes) -> PacRecord:
offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0)
codec_specific_capabilities_size = data[offset]
offset += 1
codec_specific_capabilities_bytes = data[
offset : offset + codec_specific_capabilities_size
]
offset += codec_specific_capabilities_size
metadata_size = data[offset]
metadata = data[offset : offset + metadata_size]
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
codec_specific_capabilities = codec_specific_capabilities_bytes
else:
codec_specific_capabilities = CodecSpecificCapabilities.from_bytes(
codec_specific_capabilities_bytes
)
return PacRecord(
coding_format=coding_format,
codec_specific_capabilities=codec_specific_capabilities,
metadata=metadata,
)
def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities)
return (
bytes(self.coding_format)
+ bytes([len(capabilities_bytes)])
+ capabilities_bytes
+ bytes([len(self.metadata)])
+ self.metadata
)
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesService(gatt.TemplateService):
UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
sink_pac: Optional[gatt.Characteristic]
sink_audio_locations: Optional[gatt.Characteristic]
source_pac: Optional[gatt.Characteristic]
source_audio_locations: Optional[gatt.Characteristic]
available_audio_contexts: gatt.Characteristic
supported_audio_contexts: gatt.Characteristic
def __init__(
self,
supported_source_context: ContextType,
supported_sink_context: ContextType,
available_source_context: ContextType,
available_sink_context: ContextType,
sink_pac: Sequence[PacRecord] = [],
sink_audio_locations: Optional[AudioLocation] = None,
source_pac: Sequence[PacRecord] = [],
source_audio_locations: Optional[AudioLocation] = None,
) -> None:
characteristics = []
self.supported_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', supported_sink_context, supported_source_context),
)
characteristics.append(self.supported_audio_contexts)
self.available_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', available_sink_context, available_source_context),
)
characteristics.append(self.available_audio_contexts)
if sink_pac:
self.sink_pac = gatt.Characteristic(
uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)),
)
characteristics.append(self.sink_pac)
if sink_audio_locations is not None:
self.sink_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', sink_audio_locations),
)
characteristics.append(self.sink_audio_locations)
if source_pac:
self.source_pac = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)),
)
characteristics.append(self.source_pac)
if source_audio_locations is not None:
self.source_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', source_audio_locations),
)
characteristics.append(self.source_audio_locations)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt_client.CharacteristicProxy] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
source_pac: Optional[gatt_client.CharacteristicProxy] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
available_audio_contexts: gatt_client.CharacteristicProxy
supported_audio_contexts: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = characteristics[0]
+147
View File
@@ -0,0 +1,147 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import Optional
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
ENCRYPTED = 0x00
PLAINTEXT = 0x01
class MemberLock(enum.IntEnum):
'''Coordinated Set Identification Service - 5.3 Set Member Lock.'''
UNLOCKED = 0x01
LOCKED = 0x02
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
# TODO: Implement RSI Generator
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationService(gatt.TemplateService):
UUID = gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE
set_identity_resolving_key_characteristic: gatt.Characteristic
coordinated_set_size_characteristic: Optional[gatt.Characteristic] = None
set_member_lock_characteristic: Optional[gatt.Characteristic] = None
set_member_rank_characteristic: Optional[gatt.Characteristic] = None
def __init__(
self,
set_identity_resolving_key: bytes,
coordinated_set_size: Optional[int] = None,
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
characteristics = []
self.set_identity_resolving_key_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
# TODO: Implement encrypted SIRK reader.
value=struct.pack('B', SirkType.PLAINTEXT) + set_identity_resolving_key,
)
characteristics.append(self.set_identity_resolving_key_characteristic)
if coordinated_set_size is not None:
self.coordinated_set_size_characteristic = gatt.Characteristic(
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
if set_member_lock is not None:
self.set_member_lock_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
characteristics.append(self.set_member_lock_characteristic)
if set_member_rank is not None:
self.set_member_rank_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = CoordinatedSetIdentificationService
set_identity_resolving_key: gatt_client.CharacteristicProxy
coordinated_set_size: Optional[gatt_client.CharacteristicProxy] = None
set_member_lock: Optional[gatt_client.CharacteristicProxy] = None
set_member_rank: Optional[gatt_client.CharacteristicProxy] = None
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.set_identity_resolving_key = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC
):
self.coordinated_set_size = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC
):
self.set_member_lock = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]
+67 -44
View File
@@ -187,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# fmt: on
# pylint: enable=line-too-long
@@ -579,7 +579,7 @@ class OobContext:
self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData:
pkx = bytes(reversed(self.ecc_key.x))
pkx = self.ecc_key.x[::-1]
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
@@ -677,6 +677,13 @@ class Session:
},
}
ea: bytes
eb: bytes
ltk: bytes
preq: bytes
pres: bytes
tk: bytes
def __init__(
self,
manager: Manager,
@@ -686,15 +693,10 @@ class Session:
) -> None:
self.manager = manager
self.connection = connection
self.preq: Optional[bytes] = None
self.pres: Optional[bytes] = None
self.ea = None
self.eb = None
self.stk = None
self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key = None
self.link_key: Optional[bytes] = None
self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None
@@ -787,9 +789,7 @@ class Session:
)
self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is None:
self.tk = None
else:
if pairing_config.oob.legacy_context is not None:
self.tk = pairing_config.oob.legacy_context.tk
else:
if pairing_config.oob.legacy_context is None:
@@ -807,7 +807,7 @@ class Session:
@property
def pkx(self) -> Tuple[bytes, bytes]:
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x)
return (self.ecc_key.x[::-1], self.peer_public_key_x)
@property
def pka(self) -> bytes:
@@ -1061,8 +1061,8 @@ class Session:
def send_public_key_command(self) -> None:
self.send_command(
SMP_Pairing_Public_Key_Command(
public_key_x=bytes(reversed(self.ecc_key.x)),
public_key_y=bytes(reversed(self.ecc_key.y)),
public_key_x=self.ecc_key.x[::-1],
public_key_y=self.ecc_key.y[::-1],
)
)
@@ -1090,7 +1090,7 @@ class Session:
# We can now encrypt the connection with the short term key, so that we can
# distribute the long term and/or other keys over an encrypted connection
self.manager.device.host.send_command_sync(
HCI_LE_Enable_Encryption_Command( # type: ignore[call-arg]
HCI_LE_Enable_Encryption_Command(
connection_handle=self.connection.handle,
random_number=bytes(8),
encrypted_diversifier=0,
@@ -1098,15 +1098,52 @@ class Session:
)
)
async def derive_ltk(self) -> None:
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
assert link_key is not None
@classmethod
def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
'''Derives Long Term Key from Link Key.
Args:
link_key: BR/EDR Link Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
LE Long Tern Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
if self.ct2
if ct2
else crypto.h6(link_key, b'tmp2')
)
self.ltk = crypto.h6(ilk, b'brle')
return crypto.h6(ilk, b'brle')
@classmethod
def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
'''Derives Link Key from Long Term Key.
Args:
ltk: LE Long Term Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
BR/EDR Link Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
if ct2
else crypto.h6(ltk, b'tmp1')
)
return crypto.h6(ilk, b'lebr')
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
self.send_pairing_failed(
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
def distribute_keys(self) -> None:
# Distribute the keys as required
@@ -1117,7 +1154,7 @@ class Session:
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk()
'disconnection', self.get_link_key_and_derive_ltk()
)
elif not self.sc:
# Distribute the LTK, EDIV and RAND
@@ -1147,12 +1184,7 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
self.link_key = self.derive_link_key(self.ltk, self.ct2)
else:
# CTKD: Derive LTK from LinkKey
@@ -1161,7 +1193,7 @@ class Session:
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk()
'disconnection', self.get_link_key_and_derive_ltk()
)
# Distribute the LTK, EDIV and RAND
elif not self.sc:
@@ -1191,12 +1223,7 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
self.link_key = self.derive_link_key(self.ltk, self.ct2)
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase
@@ -1754,14 +1781,10 @@ class Session:
self.peer_public_key_y = command.public_key_y
# Compute the DH key
self.dh_key = bytes(
reversed(
self.ecc_key.dh(
bytes(reversed(command.public_key_x)),
bytes(reversed(command.public_key_y)),
)
)
)
self.dh_key = self.ecc_key.dh(
command.public_key_x[::-1],
command.public_key_y[::-1],
)[::-1]
logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB:
@@ -1824,7 +1847,6 @@ class Session:
else:
self.send_pairing_dhkey_check_command()
else:
assert self.ltk
self.start_encryption(self.ltk)
def on_smp_pairing_failed_command(
@@ -1874,6 +1896,7 @@ class Manager(EventEmitter):
sessions: Dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session]
_ecc_key: Optional[crypto.EccKey]
def __init__(
self,
+2 -1
View File
@@ -42,6 +42,7 @@ HCI_PACKET_INFO: Dict[int, Tuple[int, int, str]] = {
hci.HCI_ACL_DATA_PACKET: (2, 2, 'H'),
hci.HCI_SYNCHRONOUS_DATA_PACKET: (1, 2, 'B'),
hci.HCI_EVENT_PACKET: (1, 1, 'B'),
hci.HCI_ISO_DATA_PACKET: (2, 2, 'H'),
}
@@ -150,7 +151,7 @@ class PacketParser:
try:
self.sink.on_packet(bytes(self.packet))
except Exception as error:
logger.warning(
logger.exception(
color(f'!!! Exception in on_packet: {error}', 'red')
)
self.reset()
+58 -61
View File
@@ -24,9 +24,10 @@ import platform
import usb1
from .common import Transport, ParserSource
from .. import hci
from ..colors import color
from bumble.transport.common import Transport, ParserSource
from bumble import hci
from bumble.colors import color
from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
@@ -113,7 +114,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer()
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future()
@@ -137,21 +138,20 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump
self.process_queue()
def on_packet_sent(self, transfer):
def transfer_callback(self, transfer):
status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_)
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red')
color(f'!!! OUT transfer not completed: status={status}', 'red')
)
def on_packet_sent_(self):
def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()
@@ -163,22 +163,20 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
logger.debug('submit ACL')
self.transfer.submit()
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl(
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.on_packet_sent,
callback=self.transfer_callback,
)
logger.debug('submit COMMAND')
self.transfer.submit()
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -193,11 +191,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear()
# If we have a transfer in flight, cancel it
if self.transfer.isSubmitted():
if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have
# already completed
try:
self.transfer.cancel()
self.acl_out_transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done
@@ -206,27 +204,22 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in):
def __init__(self, device, metadata, acl_in, events_in):
super().__init__()
self.context = context
self.device = device
self.metadata = metadata
self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
}
self.events_in_transfer = None
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.closed = False
def start(self):
# Set up transfer objects for input
@@ -234,7 +227,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt(
self.events_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET,
)
self.events_in_transfer.submit()
@@ -243,22 +236,23 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk(
self.acl_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET,
)
self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
def on_packet_received(self, transfer):
@property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
packet_type = transfer.getUserData()
status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
@@ -267,18 +261,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()]
)
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None
)
return
else:
logger.warning(
color(f'!!! transfer not completed: status={status}', 'red')
color(f'!!! IN transfer not completed: status={status}', 'red')
)
# Re-submit the transfer so we can receive more data
transfer.submit()
self.loop.call_soon_threadsafe(self.on_transport_lost)
async def dequeue(self):
while not self.closed:
@@ -288,21 +282,6 @@ async def open_usb_transport(spec: str) -> Transport:
return
self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self):
self.closed = True
@@ -331,15 +310,14 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed'
)
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink)
self.context = context
self.device = device
self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access
device.claimInterface(interface)
@@ -352,6 +330,22 @@ async def open_usb_transport(spec: str) -> Transport:
source.start()
sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.source.close()
self.sink.close()
@@ -361,6 +355,9 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close()
self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker
load_libusb()
context = usb1.USBContext()
@@ -540,7 +537,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError:
logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in)
source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error:
+17 -1
View File
@@ -432,7 +432,7 @@ def wrap_async(function):
def deprecated(msg: str):
"""
Throw deprecation warning before execution
Throw deprecation warning before execution.
"""
def wrapper(function):
@@ -444,3 +444,19 @@ def deprecated(msg: str):
return inner
return wrapper
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
return inner
return wrapper
+53 -13
View File
@@ -1,19 +1,19 @@
ANDROID REMOTE HCI APP
======================
This application allows using an android phone's built-in Bluetooth controller with
This application allows using an android phone's built-in Bluetooth controller with
a Bumble host stack running outside the phone (typically a development laptop or desktop).
The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
description of the Android Bluetooth HCI HAL).
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
packets coming from the controller are forwarded to the TCP socket.
Building
--------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `RemoteHCI` top level directory.
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `extras/android/RemoteHCI` top level directory.
You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at:
@@ -25,9 +25,23 @@ If the build succeeds, you can find the app APKs (debug and release) at:
Running
-------
!!! note
In the following examples, it is assumed that shell commands are executed while in the
app's root directory, `extras/android/RemoteHCI`. If you are in a different directory,
adjust the relative paths accordingly.
### Preconditions
When the proxy starts (tapping the "Start" button in the app's main activity), it will try to
bind to the Bluetooth HAL. This requires disabling SELinux temporarily, and being the only HAL client.
When the proxy starts (tapping the "Start" button in the app's main activity, or running the proxy
from an `adb shell` command line), it will try to bind to the Bluetooth HAL.
This requires that there is no other HAL client, and requires certain privileges.
For running as a regular app, this requires disabling SELinux temporarily.
For running as a command-line executable, this just requires a root shell.
#### Root Shell
!!! tip "Restart `adb` as root"
```bash
$ adb root
```
#### Disabling SELinux
Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed
@@ -56,8 +70,8 @@ development phone).
This state will also reset to the normal SELinux enforcement when you reboot.
#### Stopping the bluetooth process
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Android bluetooth stack process.
!!! tip "Checking if the Bluetooth process is running"
@@ -79,7 +93,33 @@ Airplane Mode, then rebooting. The bluetooth process should, in theory, not rest
$ adb shell cmd bluetooth_manager disable
```
### Starting the app
### Running as a command line app
You push the built APK to a temporary location on the phone's filesystem, then launch the command
line executable with an `adb shell` command.
!!! tip "Pushing the executable"
```bash
$ adb push app/build/outputs/apk/release/app-release-unsigned.apk /data/local/tmp/remotehci.apk
```
Do this every time you rebuild. Alternatively, you can push the `debug` APK instead:
```bash
$ adb push app/build/outputs/apk/debug/app-debug.apk /data/local/tmp/remotehci.apk
```
!!! tip "Start the proxy from the command line"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface"
```
This will run the proxy, listening on the default TCP port.
If you want a different port, pass it as a command line parameter
!!! tip "Start the proxy from the command line with a specific TCP port"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface 12345"
```
### Running as a normal app
You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher
@@ -103,11 +143,11 @@ automatically start the proxy, and/or set the port number.
#### Selecting a TCP port
The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on
which the proxy is accepting connections. If the default value isn't suitable, you can
which the proxy is accepting connections. If the default value isn't suitable, you can
change it there (you can also use the special value 0 to let the OS assign a port number for you).
### Connecting to the proxy
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb`
TCP forward (which should be faster than over WiFi).
@@ -116,7 +156,7 @@ TCP forward (which should be faster than over WiFi).
```bash
$ adb forward tcp:<outside-port> tcp:<inside-port>
```
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
desktop machine, and <inside-port> is the TCP port selected in the app's user interface.
Those two ports may be the same, of course.
For example, with the default TCP port 9993:
@@ -125,7 +165,7 @@ TCP forward (which should be faster than over WiFi).
```
Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
``tcp-client:<host>:<port>`` syntax.
!!! example "Connecting a Bumble client"
+5
View File
@@ -0,0 +1,5 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"advertising_interval": 100
}
+107
View File
@@ -0,0 +1,107 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
Device,
Connection,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].cis_enabled = True
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 0),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 0),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
for cis_link in cis_links:
await cis_link.disconnect()
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+86
View File
@@ -0,0 +1,86 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import logging
import sys
import os
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_esco_connection.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_esco_connection.py classic1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
connections = await asyncio.gather(
devices[0].accept(devices[1].public_address),
devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
)
def on_sco(sco_link: ScoLink):
connections[0].abort_on('disconnection', sco_link.disconnect())
devices[0].once('sco_connection', on_sco)
await devices[0].send_command(
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
)
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+69
View File
@@ -0,0 +1,69 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
)
print('example: run_extended_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
else:
target = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+134
View File
@@ -0,0 +1,134 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
CodecSpecificCapabilities,
ContextType,
AudioLocation,
SupportedSamplingFrequency,
SupportedFrameDuration,
PacRecord,
PublishedAudioCapabilitiesService,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_cig_setup.py <config-file>' '<transport-spec-for-device>')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.cis_enabled = True
await device.power_on()
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
)
)
advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
@@ -16,17 +16,77 @@ package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import java.io.IOException
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.bluetooth.BluetoothProfile
import android.content.Context
import android.os.Build
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.l2cap-client")
class L2capClient(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
class L2capClient(
private val viewModel: AppViewModel,
val bluetoothAdapter: BluetoothAdapter,
val context: Context
) {
@SuppressLint("MissingPermission")
fun run() {
viewModel.running = true
val remoteDevice = bluetoothAdapter.getRemoteDevice(viewModel.peerBluetoothAddress)
val addressIsPublic = viewModel.peerBluetoothAddress.endsWith("/P")
val address = viewModel.peerBluetoothAddress.take(17)
val remoteDevice = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
bluetoothAdapter.getRemoteLeDevice(
address,
if (addressIsPublic) {
BluetoothDevice.ADDRESS_TYPE_PUBLIC
} else {
BluetoothDevice.ADDRESS_TYPE_RANDOM
}
)
} else {
bluetoothAdapter.getRemoteDevice(address)
}
val gatt = remoteDevice.connectGatt(
context,
false,
object : BluetoothGattCallback() {
override fun onMtuChanged(gatt: BluetoothGatt, mtu: Int, status: Int) {
Log.info("MTU update: mtu=$mtu status=$status")
viewModel.mtu = mtu
}
override fun onPhyUpdate(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY update: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onPhyRead(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
gatt.readPhy()
}
}
},
BluetoothDevice.TRANSPORT_LE,
if (viewModel.use2mPhy) BluetoothDevice.PHY_LE_2M_MASK else BluetoothDevice.PHY_LE_1M_MASK
)
val socket = remoteDevice.createInsecureL2capChannel(viewModel.l2capPsm)
val client = SocketClient(viewModel, socket)
@@ -30,7 +30,7 @@ private val Log = Logger.getLogger("btbench.l2cap-server")
class L2capServer(private val viewModel: AppViewModel, private val bluetoothAdapter: BluetoothAdapter) {
@SuppressLint("MissingPermission")
fun run() {
// Advertise to that the peer can find us and connect.
// Advertise so that the peer can find us and connect.
val callback = object: AdvertiseCallback() {
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
@@ -50,13 +50,12 @@ class L2capServer(private val viewModel: AppViewModel, private val bluetoothAdap
val advertiseData = AdvertiseData.Builder().build()
val scanData = AdvertiseData.Builder().setIncludeDeviceName(true).build()
val advertiser = bluetoothAdapter.bluetoothLeAdvertiser
advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback)
val serverSocket = bluetoothAdapter.listenUsingInsecureL2capChannel()
viewModel.l2capPsm = serverSocket.psm
Log.info("psm = $serverSocket.psm")
val server = SocketServer(viewModel, serverSocket)
server.run({ advertiser.stopAdvertising(callback) })
server.run({ advertiser.stopAdvertising(callback) }, { advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback) })
}
}
@@ -26,23 +26,33 @@ import android.os.Bundle
import androidx.activity.ComponentActivity
import androidx.activity.compose.setContent
import androidx.activity.result.contract.ActivityResultContracts
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.rememberScrollState
import androidx.compose.foundation.text.KeyboardActions
import androidx.compose.foundation.text.KeyboardOptions
import androidx.compose.foundation.verticalScroll
import androidx.compose.material3.Button
import androidx.compose.material3.Divider
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.Slider
import androidx.compose.material3.Surface
import androidx.compose.material3.Switch
import androidx.compose.material3.Text
import androidx.compose.material3.TextField
import androidx.compose.runtime.Composable
import androidx.compose.runtime.remember
import androidx.compose.ui.Alignment
import androidx.compose.ui.ExperimentalComposeUiApi
import androidx.compose.ui.Modifier
import androidx.compose.ui.focus.FocusRequester
import androidx.compose.ui.focus.focusRequester
import androidx.compose.ui.platform.LocalFocusManager
import androidx.compose.ui.platform.LocalSoftwareKeyboardController
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.text.input.ImeAction
@@ -171,7 +181,7 @@ class MainActivity : ComponentActivity() {
}
private fun runL2capClient() {
val l2capClient = bluetoothAdapter?.let { L2capClient(appViewModel, it) }
val l2capClient = bluetoothAdapter?.let { L2capClient(appViewModel, it, baseContext) }
l2capClient?.run()
}
@@ -199,9 +209,12 @@ fun MainView(
runL2capServer: () -> Unit
) {
BTBenchTheme {
// A surface container using the 'background' color from the theme
val scrollState = rememberScrollState()
Surface(
modifier = Modifier.fillMaxSize(), color = MaterialTheme.colorScheme.background
modifier = Modifier
.fillMaxSize()
.verticalScroll(scrollState),
color = MaterialTheme.colorScheme.background
) {
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
@@ -212,28 +225,33 @@ fun MainView(
)
Divider()
val keyboardController = LocalSoftwareKeyboardController.current
TextField(label = {
Text(text = "Peer Bluetooth Address")
},
val focusRequester = remember { FocusRequester() }
val focusManager = LocalFocusManager.current
TextField(
label = {
Text(text = "Peer Bluetooth Address")
},
value = appViewModel.peerBluetoothAddress,
modifier = Modifier.fillMaxWidth(),
modifier = Modifier.fillMaxWidth().focusRequester(focusRequester),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Ascii, imeAction = ImeAction.Done
),
onValueChange = {
appViewModel.updatePeerBluetoothAddress(it)
},
keyboardActions = KeyboardActions(onDone = { keyboardController?.hide() })
keyboardActions = KeyboardActions(onDone = {
keyboardController?.hide()
focusManager.clearFocus()
})
)
Divider()
TextField(label = {
Text(text = "L2CAP PSM")
},
value = appViewModel.l2capPsm.toString(),
modifier = Modifier.fillMaxWidth(),
modifier = Modifier.fillMaxWidth().focusRequester(focusRequester),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Number,
imeAction = ImeAction.Done
keyboardType = KeyboardType.Number, imeAction = ImeAction.Done
),
onValueChange = {
if (it.isNotEmpty()) {
@@ -243,7 +261,11 @@ fun MainView(
}
}
},
keyboardActions = KeyboardActions(onDone = { keyboardController?.hide() }))
keyboardActions = KeyboardActions(onDone = {
keyboardController?.hide()
focusManager.clearFocus()
})
)
Divider()
Slider(
value = appViewModel.senderPacketCountSlider, onValueChange = {
@@ -264,7 +286,19 @@ fun MainView(
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
Row() {
Row(
horizontalArrangement = Arrangement.SpaceBetween,
verticalAlignment = Alignment.CenterVertically
) {
Text(text = "2M PHY")
Spacer(modifier = Modifier.padding(start = 8.dp))
Switch(
checked = appViewModel.use2mPhy,
onCheckedChange = { appViewModel.use2mPhy = it }
)
}
Row {
ActionButton(
text = "RFCOMM Client", onClick = runRfcommClient, !appViewModel.running
)
@@ -272,7 +306,7 @@ fun MainView(
text = "RFCOMM Server", onClick = runRfcommServer, !appViewModel.running
)
}
Row() {
Row {
ActionButton(
text = "L2CAP Client", onClick = runL2capClient, !appViewModel.running
)
@@ -281,6 +315,12 @@ fun MainView(
)
}
Divider()
Text(
text = if (appViewModel.mtu != 0) "MTU: ${appViewModel.mtu}" else ""
)
Text(
text = if (appViewModel.rxPhy != 0 || appViewModel.txPhy != 0) "PHY: tx=${appViewModel.txPhy}, rx=${appViewModel.rxPhy}" else ""
)
Text(
text = "Packets Sent: ${appViewModel.packetsSent}"
)
@@ -32,6 +32,10 @@ class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableStateOf(0)
var use2mPhy by mutableStateOf(true)
var mtu by mutableStateOf(0)
var rxPhy by mutableStateOf(0)
var txPhy by mutableStateOf(0)
var senderPacketCountSlider by mutableFloatStateOf(0.0F)
var senderPacketSizeSlider by mutableFloatStateOf(0.0F)
var senderPacketCount by mutableIntStateOf(DEFAULT_SENDER_PACKET_COUNT)
@@ -64,11 +68,12 @@ class AppViewModel : ViewModel() {
}
fun updatePeerBluetoothAddress(peerBluetoothAddress: String) {
this.peerBluetoothAddress = peerBluetoothAddress
val address = peerBluetoothAddress.uppercase()
this.peerBluetoothAddress = address
// Save the address to the preferences
with(preferences!!.edit()) {
putString(PEER_BLUETOOTH_ADDRESS_PREF_KEY, peerBluetoothAddress)
putString(PEER_BLUETOOTH_ADDRESS_PREF_KEY, address)
apply()
}
}
@@ -116,7 +121,7 @@ class AppViewModel : ViewModel() {
}
fun updateSenderPacketSizeSlider() {
if (senderPacketSize <= 1) {
if (senderPacketSize <= 16) {
senderPacketSizeSlider = 0.0F
} else if (senderPacketSize <= 256) {
senderPacketSizeSlider = 0.02F
@@ -138,7 +143,7 @@ class AppViewModel : ViewModel() {
fun updateSenderPacketSize() {
if (senderPacketSizeSlider < 0.1F) {
senderPacketSize = 1
senderPacketSize = 16
} else if (senderPacketSizeSlider < 0.3F) {
senderPacketSize = 256
} else if (senderPacketSizeSlider < 0.5F) {
@@ -25,7 +25,8 @@ private val Log = Logger.getLogger("btbench.rfcomm-client")
class RfcommClient(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
@SuppressLint("MissingPermission")
fun run() {
val remoteDevice = bluetoothAdapter.getRemoteDevice(viewModel.peerBluetoothAddress)
val address = viewModel.peerBluetoothAddress.take(17)
val remoteDevice = bluetoothAdapter.getRemoteDevice(address)
val socket = remoteDevice.createInsecureRfcommSocketToServiceRecord(
DEFAULT_RFCOMM_UUID
)
@@ -30,6 +30,6 @@ class RfcommServer(private val viewModel: AppViewModel, val bluetoothAdapter: Bl
)
val server = SocketServer(viewModel, serverSocket)
server.run({})
server.run({}, {})
}
}
@@ -22,6 +22,8 @@ import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.socket-client")
private const val DEFAULT_STARTUP_DELAY = 3000
class SocketClient(private val viewModel: AppViewModel, private val socket: BluetoothSocket) {
@SuppressLint("MissingPermission")
fun run() {
@@ -56,6 +58,10 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue
socketDataSource.receive()
}
Log.info("Startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("Starting to send")
sender.run()
cleanup()
}
@@ -22,14 +22,13 @@ import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.socket-server")
class SocketServer(private val viewModel: AppViewModel, private val serverSocket: BluetoothServerSocket) {
fun run(onTerminate: () -> Unit) {
fun run(onConnected: () -> Unit, onDisconnected: () -> Unit) {
var aborted = false
viewModel.running = true
fun cleanup() {
serverSocket.close()
viewModel.running = false
onTerminate()
}
thread(name = "SocketServer") {
@@ -38,6 +37,7 @@ class SocketServer(private val viewModel: AppViewModel, private val serverSocket
serverSocket.close()
}
Log.info("waiting for connection...")
onDisconnected()
val socket = try {
serverSocket.accept()
} catch (error: IOException) {
@@ -45,7 +45,8 @@ class SocketServer(private val viewModel: AppViewModel, private val serverSocket
cleanup()
return@thread
}
Log.info("got connection")
Log.info("got connection from ${socket.remoteDevice.address}")
onConnected()
viewModel.aborter = {
aborted = true
@@ -1,5 +1,5 @@
[versions]
agp = "8.3.0-alpha11"
agp = "8.2.0"
kotlin = "1.9.0"
core-ktx = "1.12.0"
junit = "4.13.2"
@@ -0,0 +1,57 @@
package com.github.google.bumble.remotehci
import java.io.IOException
class CommandLineInterface {
companion object {
fun printUsage() {
System.out.println("usage: <launch-command> [-h|--help] [<tcp-port>]")
}
@JvmStatic fun main(args: Array<String>) {
System.out.println("Starting proxy")
var tcpPort = DEFAULT_TCP_PORT
if (args.isNotEmpty()) {
if (args[0] == "-h" || args[0] == "--help") {
printUsage()
return
}
try {
tcpPort = args[0].toInt()
} catch (error: NumberFormatException) {
System.out.println("ERROR: invalid TCP port argument")
printUsage()
return
}
}
try {
val hciProxy = HciProxy(tcpPort, object : HciProxy.Listener {
override fun onHostConnectionState(connected: Boolean) {
}
override fun onHciPacketCountChange(
commandPacketsReceived: Int,
aclPacketsReceived: Int,
scoPacketsReceived: Int,
eventPacketsSent: Int,
aclPacketsSent: Int,
scoPacketsSent: Int
) {
}
override fun onMessage(message: String?) {
System.out.println(message)
}
})
hciProxy.run()
} catch (error: IOException) {
System.err.println("Exception while running HCI Server: $error")
} catch (error: HciProxy.HalException) {
System.err.println("HAL exception: ${error.message}")
}
}
}
}
@@ -10,8 +10,10 @@ import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.rememberScrollState
import androidx.compose.foundation.text.KeyboardActions
import androidx.compose.foundation.text.KeyboardOptions
import androidx.compose.foundation.verticalScroll
import androidx.compose.material3.Button
import androidx.compose.material3.Divider
import androidx.compose.material3.ExperimentalMaterial3Api
@@ -71,7 +73,7 @@ class AppViewModel : ViewModel(), HciProxy.Listener {
this.tcpPort = tcpPort
// Save the port to the preferences
with (preferences!!.edit()) {
with(preferences!!.edit()) {
putString(TCP_PORT_PREF_KEY, tcpPort.toString())
apply()
}
@@ -138,7 +140,8 @@ class MainActivity : ComponentActivity() {
log.warning("Exception while running HCI Server: $error")
} catch (error: HalException) {
log.warning("HAL exception: ${error.message}")
appViewModel.message = "Cannot bind to HAL (${error.message}). You may need to use the command 'setenforce 0' in a root adb shell."
appViewModel.message =
"Cannot bind to HAL (${error.message}). You may need to use the command 'setenforce 0' in a root adb shell."
}
log.info("HCI Proxy thread ended")
appViewModel.canStart = true
@@ -157,9 +160,12 @@ fun ActionButton(text: String, onClick: () -> Unit, enabled: Boolean) {
@Composable
fun MainView(appViewModel: AppViewModel, startProxy: () -> Unit) {
RemoteHCITheme {
// A surface container using the 'background' color from the theme
val scrollState = rememberScrollState()
Surface(
modifier = Modifier.fillMaxSize(), color = MaterialTheme.colorScheme.background
modifier = Modifier
.fillMaxSize()
.verticalScroll(scrollState),
color = MaterialTheme.colorScheme.background
) {
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
@@ -174,13 +180,15 @@ fun MainView(appViewModel: AppViewModel, startProxy: () -> Unit) {
)
Divider()
val keyboardController = LocalSoftwareKeyboardController.current
TextField(
label = {
Text(text = "TCP Port")
},
TextField(label = {
Text(text = "TCP Port")
},
value = appViewModel.tcpPort.toString(),
modifier = Modifier.fillMaxWidth(),
keyboardOptions = KeyboardOptions.Default.copy(keyboardType = KeyboardType.Number, imeAction = ImeAction.Done),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Number,
imeAction = ImeAction.Done
),
onValueChange = {
if (it.isNotEmpty()) {
val tcpPort = it.toIntOrNull()
@@ -189,10 +197,7 @@ fun MainView(appViewModel: AppViewModel, startProxy: () -> Unit) {
}
}
},
keyboardActions = KeyboardActions(
onDone = {keyboardController?.hide()}
)
)
keyboardActions = KeyboardActions(onDone = { keyboardController?.hide() }))
Divider()
val connectState = if (appViewModel.hostConnected) "CONNECTED" else "DISCONNECTED"
Text(
@@ -1,5 +1,5 @@
[versions]
agp = "8.3.0-alpha11"
agp = "8.2.0"
kotlin = "1.8.10"
core-ktx = "1.9.0"
junit = "4.13.2"
+4 -4
View File
@@ -1073,9 +1073,9 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "openssl"
version = "0.10.57"
version = "0.10.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c"
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
dependencies = [
"bitflags 2.4.0",
"cfg-if",
@@ -1105,9 +1105,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.92"
version = "0.9.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7e971c2c2bba161b2d2fdf37080177eff520b3bc044787c7f1f5f9e78d869b"
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
dependencies = [
"cc",
"libc",
+2 -2
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ url = https://github.com/google/bumble
[options]
python_requires = >=3.8
packages = bumble, bumble.transport, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
packages = bumble, bumble.transport, bumble.transport.grpc_protobuf, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
package_dir =
bumble = bumble
bumble.apps = apps
+151
View File
@@ -0,0 +1,151 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import logging
from bumble import device
from bumble.hci import CodecID, CodingFormat
from bumble.profiles.bap import (
AudioLocation,
SupportedFrameDuration,
SupportedSamplingFrequency,
CodecSpecificCapabilities,
ContextType,
PacRecord,
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_codec_specific_capabilities() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap
# -----------------------------------------------------------------------------
def test_pac_record() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
pac_record = PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=cap,
metadata=b'',
)
assert PacRecord.from_bytes(bytes(pac_record)) == pac_record
# -----------------------------------------------------------------------------
def test_vendor_specific_pac_record() -> None:
# Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata.
RAW_DATA = bytes.fromhex('ffe000ffff0000')
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():
devices = TwoDevices()
devices[0].add_service(
PublishedAudioCapabilitiesService(
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
supported_source_context=0,
available_source_context=0,
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
pacs_client = await peer.discover_service_and_create_proxy(
PublishedAudioCapabilitiesServiceProxy
)
# -----------------------------------------------------------------------------
async def run():
await test_pacs()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+74
View File
@@ -0,0 +1,74 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import struct
import logging
from bumble import device
from bumble.profiles import csip
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices()
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy
)
assert (
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED
)
assert await csis_client.set_member_rank.read_value() == struct.pack('B', 0)
# -----------------------------------------------------------------------------
async def run():
await test_csis()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+79
View File
@@ -20,6 +20,7 @@ import logging
import os
import struct
import pytest
from unittest.mock import Mock, ANY
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
@@ -763,6 +764,83 @@ async def test_subscribe_notify():
assert not c3._called_3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_unsubscribe():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
characteristic2 = Characteristic(
'3234C4F4-3F34-4616-8935-45A50EE05DEB',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic1, characteristic2],
)
server.add_services([service1])
mock1 = Mock()
characteristic1.on('subscription', mock1)
mock2 = Mock()
characteristic2.on('subscription', mock2)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
await c1.subscribe()
await async_barrier()
mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe()
await async_barrier()
mock2.assert_called_once_with(ANY, True, False)
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()
mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock()
await c2.unsubscribe()
await async_barrier()
mock2.assert_called_once_with(ANY, False, False)
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()
mock1.assert_not_called()
mock2.reset_mock()
await c2.unsubscribe()
await async_barrier()
mock2.assert_not_called()
mock1.reset_mock()
await c1.unsubscribe(force=True)
await async_barrier()
mock1.assert_called_once_with(ANY, False, False)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mtu_exchange():
@@ -886,6 +964,7 @@ async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_unsubscribe()
await test_characteristic_encoding()
await test_mtu_exchange()
+51
View File
@@ -24,12 +24,15 @@ from bumble.hci import (
HCI_RESET_COMMAND,
HCI_SUCCESS,
Address,
CodingFormat,
CodecID,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_CustomPacket,
HCI_Disconnect_Command,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Add_Device_To_Filter_Accept_List_Command,
HCI_LE_Advertising_Report_Event,
HCI_LE_Channel_Selection_Algorithm_Event,
@@ -51,6 +54,7 @@ from bumble.hci import (
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Setup_ISO_Data_Path_Command,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_PIN_Code_Request_Reply_Command,
@@ -442,6 +446,28 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Setup_ISO_Data_Path_Command():
command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000'))
assert command.connection_handle == 0x0060
assert command.data_path_direction == 0x00
assert command.data_path_id == 0x01
assert command.codec_id == CodingFormat(CodecID.TRANSPARENT)
assert command.controller_delay == 0
assert command.codec_configuration == b''
command = HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=0x0060,
data_path_direction=0x00,
data_path_id=0x01,
codec_id=CodingFormat(CodecID.TRANSPARENT),
controller_delay=0x00,
codec_configuration=b'',
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
@@ -461,6 +487,29 @@ def test_custom():
assert packet.payload == data
# -----------------------------------------------------------------------------
def test_iso_data_packet():
data = bytes.fromhex(
'05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
'52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
packet = HCI_IsoDataPacket.from_bytes(data)
assert packet.connection_handle == 0x0061
assert packet.packet_status_flag == 0
assert packet.pb_flag == 0x02
assert packet.ts_flag == 0x01
assert packet.data_total_length == 68
assert packet.time_stamp == 2716911914
assert packet.packet_sequence_number == 147
assert packet.iso_sdu_length == 60
assert packet.iso_sdu_fragment == bytes.fromhex(
'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3'
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert packet.to_bytes() == data
# -----------------------------------------------------------------------------
def run_test_events():
test_HCI_Event()
@@ -499,6 +548,7 @@ def run_test_commands():
test_HCI_LE_Set_Default_PHY_Command()
test_HCI_LE_Set_Extended_Scan_Parameters_Command()
test_HCI_LE_Set_Extended_Advertising_Enable_Command()
test_HCI_LE_Setup_ISO_Data_Path_Command()
# -----------------------------------------------------------------------------
@@ -507,3 +557,4 @@ if __name__ == '__main__':
run_test_commands()
test_address()
test_custom()
test_iso_data_packet()
+3 -12
View File
@@ -21,7 +21,7 @@ import logging
import os
import pytest
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
@@ -38,7 +38,6 @@ from bumble.smp import (
OobLegacyContext,
)
from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
from bumble.keys import PairingKeys
@@ -519,16 +518,8 @@ async def test_self_smp_over_classic():
# Mock connection
# TODO: Implement Classic SSP and encryption in link relayer
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
two_devices.devices[0].on_link_key(
two_devices.devices[1].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[1].on_link_key(
two_devices.devices[0].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY)
two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY)
two_devices.connections[0].encryption = 1
two_devices.connections[1].encryption = 1
+68 -72
View File
@@ -16,6 +16,9 @@
# Imports
# -----------------------------------------------------------------------------
import pytest
from bumble import smp
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.pairing import OobData, OobSharedData, LeRole
from bumble.hci import Address
@@ -28,8 +31,8 @@ from bumble.core import AdvertisingData
# -----------------------------------------------------------------------------
def reversed_hex(hex_str):
return bytes(reversed(bytes.fromhex(hex_str)))
def reversed_hex(hex_str: str) -> bytes:
return bytes.fromhex(hex_str)[::-1]
# -----------------------------------------------------------------------------
@@ -129,112 +132,79 @@ def test_aes_cmac():
# -----------------------------------------------------------------------------
def test_f4():
u = bytes(
reversed(
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
u = reversed_hex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
v = bytes(
reversed(
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
v = reversed_hex(
'55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
z = bytes([0])
x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
z = b'\0'
value = f4(u, v, x, z)
assert bytes(reversed(value)) == bytes.fromhex(
'f2c916f1 07a9bd1c f1eda1be a974872d'
)
assert value == reversed_hex('f2c916f1 07a9bd1c f1eda1be a974872d')
# -----------------------------------------------------------------------------
def test_f5():
w = bytes(
reversed(
bytes.fromhex(
'ec0234a3 57c8ad05 341010a6 0a397d9b'
+ '99796b13 b4f866f1 868d34f3 73bfa698'
)
)
w = reversed_hex(
'ec0234a3 57c8ad05 341010a6 0a397d9b 99796b13 b4f866f1 868d34f3 73bfa698'
)
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce')))
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1')))
n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
a1 = reversed_hex('00561237 37bfce')
a2 = reversed_hex('00a71370 2dcfc1')
value = f5(w, n1, n2, a1, a2)
assert bytes(reversed(value[0])) == bytes.fromhex(
'2965f176 a1084a02 fd3f6a20 ce636e20'
)
assert bytes(reversed(value[1])) == bytes.fromhex(
'69867911 69d7cd23 980522b5 94750a38'
)
assert value[0] == reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
assert value[1] == reversed_hex('69867911 69d7cd23 980522b5 94750a38')
# -----------------------------------------------------------------------------
def test_f6():
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
mac_key = bytes(reversed(bytes.fromhex('2965f176 a1084a02 fd3f6a20 ce636e20')))
r = bytes(reversed(bytes.fromhex('12a3343b b453bb54 08da42d2 0c2d0fc8')))
io_cap = bytes(reversed(bytes.fromhex('010102')))
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce')))
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1')))
n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
mac_key = reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
r = reversed_hex('12a3343b b453bb54 08da42d2 0c2d0fc8')
io_cap = reversed_hex('010102')
a1 = reversed_hex('00561237 37bfce')
a2 = reversed_hex('00a71370 2dcfc1')
value = f6(mac_key, n1, n2, r, io_cap, a1, a2)
assert bytes(reversed(value)) == bytes.fromhex(
'e3c47398 9cd0e8c5 d26c0b09 da958f61'
)
assert value == reversed_hex('e3c47398 9cd0e8c5 d26c0b09 da958f61')
# -----------------------------------------------------------------------------
def test_g2():
u = bytes(
reversed(
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
u = reversed_hex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
v = bytes(
reversed(
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
v = reversed_hex(
'55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
y = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
y = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
value = g2(u, v, x, y)
assert value == 0x2F9ED5BA
# -----------------------------------------------------------------------------
def test_h6():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY_ID = bytes.fromhex('6c656272')
assert h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
assert h6(KEY, KEY_ID) == reversed_hex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
# -----------------------------------------------------------------------------
def test_h7():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
assert h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011')
assert h7(SALT, KEY) == reversed_hex('fb173597 c6a3c0ec d2998c2a 75a57011')
# -----------------------------------------------------------------------------
def test_ah():
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')))
prand = bytes(reversed(bytes.fromhex('708194')))
irk = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
prand = reversed_hex('708194')
value = ah(irk, prand)
expected = bytes(reversed(bytes.fromhex('0dfbaa')))
expected = reversed_hex('0dfbaa')
assert value == expected
@@ -243,7 +213,7 @@ def test_oob_data():
oob_data = OobData(
address=Address("F0:F1:F2:F3:F4:F5"),
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])),
shared_data=OobSharedData(c=b'12', r=b'34'),
)
oob_data_ad = oob_data.to_ad()
oob_data_bytes = bytes(oob_data_ad)
@@ -255,6 +225,32 @@ def test_oob_data():
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'bc1ca4ef 633fc1bd 0d8230af ee388fb0'),
(True, '287ad379 dca40253 0a39f1f4 3047b835'),
],
)
def test_ltk_to_link_key(ct2: bool, expected: str):
LTK = reversed_hex('368df9bc e3264b58 bd066c33 334fbf64')
assert smp.Session.derive_link_key(LTK, ct2) == reversed_hex(expected)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'a813fb72 f1a3dfa1 8a2c9a43 f10d0a30'),
(True, 'e85e09eb 5eccb3e2 69418a13 3211bc79'),
],
)
def test_link_key_to_ltk(ct2: bool, expected: str):
LINK_KEY = reversed_hex('05040302 01000908 07060504 03020100')
assert smp.Session.derive_ltk(LINK_KEY, ct2) == reversed_hex(expected)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_ecc()
+3
View File
@@ -71,3 +71,6 @@ class TwoDevices:
# Check the post conditions
assert self.connections[0] is not None
assert self.connections[1] is not None
def __getitem__(self, index: int) -> Device:
return self.devices[index]