Compare commits

...

38 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 8400ff0802 shared usage printer 2023-12-04 00:37:28 -08:00
Gilles Boccon-Gibod 0ed6aa230b address PR comment 2023-12-04 00:32:04 -08:00
Gilles Boccon-Gibod 72d5360af9 keep projects compatible with Android Studio Hedgehog 2023-12-03 18:06:54 -08:00
Gilles Boccon-Gibod ac3961e763 add doc 2023-12-03 17:50:42 -08:00
Gilles Boccon-Gibod 8385035400 add CLI support 2023-12-03 16:35:14 -08:00
zxzxwu 247cb89332 Merge pull request #358 from zxzxwu/coding2
Add variable-length bytes field
2023-12-01 03:26:38 +08:00
Josh Wu 3fc71a0266 Add variable-length bytes field 2023-12-01 03:16:52 +08:00
zxzxwu 392dcc3a05 Merge pull request #357 from zxzxwu/coding
Refactor CodingFormat
2023-12-01 03:15:33 +08:00
Josh Wu f27015d1b7 Refactor CodingFormat
As CodingFormat is now used by HFP and LEA, and vendor specific codecs
are introduced, this object needs to provide more information.
2023-12-01 02:58:09 +08:00
zxzxwu 86a19b41aa Merge pull request #344 from zxzxwu/cis
CIS and SCO responder support
2023-11-30 21:00:55 +08:00
Gilles Boccon-Gibod 320164d476 Merge pull request #355 from google/gbg/fix-gatt-unsubscribe
fix #354 (gatt unsubscribe)
2023-11-29 22:28:57 -08:00
Josh Wu 40ae661ee5 More SCO support and warnings and typo fix 2023-11-30 12:59:43 +08:00
Josh Wu c5def93bb8 CIS and SCO responder support 2023-11-30 12:16:40 +08:00
zxzxwu a9c4c5833d Merge pull request #350 from zxzxwu/csip
Add Coordinated Set Identification Service(CSIS)
2023-11-30 12:15:56 +08:00
Gilles Boccon-Gibod 58c9c4f590 fix #354 2023-11-29 19:19:40 -08:00
zxzxwu 24524d88cb Merge pull request #342 from zxzxwu/typing
Typing helper
2023-11-30 00:21:44 +08:00
zxzxwu b8849ab311 Merge pull request #349 from zxzxwu/stack
Log track back in on_packet
2023-11-30 00:20:20 +08:00
Josh Wu f3cd8f8ed0 Typing helper 2023-11-29 21:24:27 +08:00
zxzxwu 2b26de3f3a Merge pull request #348 from zxzxwu/gattc
Typing GATT Client and Device Peer
2023-11-29 15:09:40 +08:00
Josh Wu 0149c4c212 Log track back in on_packet
Many errors are raised in on_packet() callbacks, but currently it only
provides a very brief error message.
2023-11-29 15:01:15 +08:00
Gilles Boccon-Gibod f2ed898784 Merge pull request #352 from google/gbg/more-gatt-uuids
add a few uuids
2023-11-28 22:44:40 -08:00
Josh Wu 464a476f9f Add CSIP 2023-11-29 14:09:31 +08:00
Gilles Boccon-Gibod e85d067fb5 add a few uuids 2023-11-28 20:02:00 -08:00
Josh Wu 04d5bf3afc Typing GATT Client and Device Peer 2023-11-28 21:57:57 +08:00
zxzxwu a13e193d3b Merge pull request #343 from zxzxwu/lea-gatt
Add LE Audio GATT services and characteristics definitions
2023-11-28 10:34:39 +08:00
Gilles Boccon-Gibod 28a1a5ebc2 Merge pull request #347 from akuker/main
Include transport.grpc_protobuf in the setup package.
2023-11-27 15:23:17 -08:00
Tony Kuker 6310dc777f Include transport.grpc_protobuf in the setup package. 2023-11-27 16:48:37 -06:00
Josh Wu 863de18877 Add LE Audio GATT definitions 2023-11-27 17:53:00 +08:00
zxzxwu f0e5cdee1a Merge pull request #339 from zxzxwu/enc
Refactor crypto and fix CTKD
2023-11-27 14:05:37 +08:00
zxzxwu 7bc7d0f5af Merge pull request #334 from zxzxwu/extadv
Add support for LE Extended Advertising
2023-11-27 14:01:31 +08:00
Josh Wu a65a215fd7 Provide IntFlag.name property fallback 2023-11-26 19:42:22 +08:00
Josh Wu 80d34a226d Slightly refactor and fix CTKD
It seems sample input data provided in the spec is big-endian (just
like other AES-CMAC-based functions), but all keys are in little-endian(
HCI standard), so they need to be reverse before and after applying
AES-CMAC.
2023-11-26 16:55:10 +08:00
Josh Wu a9628f73e3 Add support for Extended Advertising 2023-11-26 15:03:09 +08:00
Lucas Abel 9bf2e03354 device: set authenticated and sc state on AES encryption change 2023-11-23 06:39:55 +01:00
Gilles Boccon-Gibod 2900b93bb3 Merge pull request #120 from google/gbg/usb-cleanup
minor cleanup of the internals of the usb transport implementation
2023-11-22 17:18:23 -08:00
Gilles Boccon-Gibod 284cc8a321 Merge pull request #326 from google/gbg/android-benchmark-app
Android benchmarking app
2023-11-22 15:39:52 -08:00
Gilles Boccon-Gibod 9c7089c8ff terminate when unplugged 2023-11-19 11:36:38 -08:00
Gilles Boccon-Gibod a8ec1b0949 minor cleanup of the internals of the usb transport implementation 2023-11-15 17:26:21 -08:00
30 changed files with 2003 additions and 518 deletions
+6
View File
@@ -21,7 +21,9 @@
"cccds", "cccds",
"cmac", "cmac",
"CONNECTIONLESS", "CONNECTIONLESS",
"csip",
"csrcs", "csrcs",
"CVSD",
"datagram", "datagram",
"DATALINK", "DATALINK",
"delayreport", "delayreport",
@@ -29,6 +31,7 @@
"deregistration", "deregistration",
"dhkey", "dhkey",
"diversifier", "diversifier",
"endianness",
"Fitbit", "Fitbit",
"GATTLINK", "GATTLINK",
"HANDSFREE", "HANDSFREE",
@@ -38,12 +41,14 @@
"libc", "libc",
"libusb", "libusb",
"MITM", "MITM",
"MSBC",
"NDIS", "NDIS",
"netsim", "netsim",
"NONBLOCK", "NONBLOCK",
"NONCONN", "NONCONN",
"OXIMETER", "OXIMETER",
"popleft", "popleft",
"PRAND",
"protobuf", "protobuf",
"psms", "psms",
"pyee", "pyee",
@@ -55,6 +60,7 @@
"SEID", "SEID",
"seids", "seids",
"SERV", "SERV",
"SIRK",
"ssrc", "ssrc",
"strerror", "strerror",
"subband", "subband",
+82 -66
View File
@@ -21,6 +21,8 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import logging import logging
import operator import operator
@@ -29,11 +31,13 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.ec import ( from cryptography.hazmat.primitives.asymmetric.ec import (
generate_private_key, generate_private_key,
ECDH, ECDH,
EllipticCurvePrivateKey,
EllipticCurvePublicNumbers, EllipticCurvePublicNumbers,
EllipticCurvePrivateNumbers, EllipticCurvePrivateNumbers,
SECP256R1, SECP256R1,
) )
from cryptography.hazmat.primitives import cmac from cryptography.hazmat.primitives import cmac
from typing import Tuple
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -46,16 +50,18 @@ logger = logging.getLogger(__name__)
# Classes # Classes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class EccKey: class EccKey:
def __init__(self, private_key): def __init__(self, private_key: EllipticCurvePrivateKey) -> None:
self.private_key = private_key self.private_key = private_key
@classmethod @classmethod
def generate(cls): def generate(cls) -> EccKey:
private_key = generate_private_key(SECP256R1()) private_key = generate_private_key(SECP256R1())
return cls(private_key) return cls(private_key)
@classmethod @classmethod
def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes): def from_private_key_bytes(
cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes
) -> EccKey:
d = int.from_bytes(d_bytes, byteorder='big', signed=False) d = int.from_bytes(d_bytes, byteorder='big', signed=False)
x = int.from_bytes(x_bytes, byteorder='big', signed=False) x = int.from_bytes(x_bytes, byteorder='big', signed=False)
y = int.from_bytes(y_bytes, byteorder='big', signed=False) y = int.from_bytes(y_bytes, byteorder='big', signed=False)
@@ -65,7 +71,7 @@ class EccKey:
return cls(private_key) return cls(private_key)
@property @property
def x(self): def x(self) -> bytes:
return ( return (
self.private_key.public_key() self.private_key.public_key()
.public_numbers() .public_numbers()
@@ -73,14 +79,14 @@ class EccKey:
) )
@property @property
def y(self): def y(self) -> bytes:
return ( return (
self.private_key.public_key() self.private_key.public_key()
.public_numbers() .public_numbers()
.y.to_bytes(32, byteorder='big') .y.to_bytes(32, byteorder='big')
) )
def dh(self, public_key_x, public_key_y): def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes:
x = int.from_bytes(public_key_x, byteorder='big', signed=False) x = int.from_bytes(public_key_x, byteorder='big', signed=False)
y = int.from_bytes(public_key_y, byteorder='big', signed=False) y = int.from_bytes(public_key_y, byteorder='big', signed=False)
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key() public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
@@ -93,14 +99,23 @@ class EccKey:
# Functions # Functions
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def xor(x, y): def xor(x: bytes, y: bytes) -> bytes:
assert len(x) == len(y) assert len(x) == len(y)
return bytes(map(operator.xor, x, y)) return bytes(map(operator.xor, x, y))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def r(): def reverse(input: bytes) -> bytes:
'''
Returns bytes of input in reversed endianness.
'''
return input[::-1]
# -----------------------------------------------------------------------------
def r() -> bytes:
''' '''
Generate 16 bytes of random data Generate 16 bytes of random data
''' '''
@@ -108,20 +123,20 @@ def r():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def e(key, data): def e(key: bytes, data: bytes) -> bytes:
''' '''
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output. AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
''' '''
cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB()) cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB())
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
return bytes(reversed(encryptor.update(bytes(reversed(data))))) return reverse(encryptor.update(reverse(data)))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def ah(k, r): # pylint: disable=redefined-outer-name def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
''' '''
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
''' '''
@@ -132,7 +147,16 @@ def ah(k, r): # pylint: disable=redefined-outer-name
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name def c1(
k: bytes,
r: bytes,
preq: bytes,
pres: bytes,
iat: int,
rat: int,
ia: bytes,
ra: bytes,
) -> bytes: # pylint: disable=redefined-outer-name
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
LE Legacy Pairing LE Legacy Pairing
@@ -144,7 +168,7 @@ def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-n
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def s1(k, r1, r2): def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
Pairing Pairing
@@ -154,7 +178,7 @@ def s1(k, r1, r2):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def aes_cmac(m, k): def aes_cmac(m: bytes, k: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
@@ -166,20 +190,16 @@ def aes_cmac(m, k):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def f4(u, v, x, z): def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
Generation Function f4 Generation Function f4
''' '''
return bytes( return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x)))
reversed(
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x)))
)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def f5(w, n1, n2, a1, a2): def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
Function f5 Function f5
@@ -187,87 +207,83 @@ def f5(w, n1, n2, a1, a2):
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
''' '''
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE') salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
t = aes_cmac(bytes(reversed(w)), salt) t = aes_cmac(reverse(w), salt)
key_id = bytes([0x62, 0x74, 0x6C, 0x65]) key_id = bytes([0x62, 0x74, 0x6C, 0x65])
return ( return (
bytes( reverse(
reversed( aes_cmac(
aes_cmac( bytes([0])
bytes([0]) + key_id
+ key_id + reverse(n1)
+ bytes(reversed(n1)) + reverse(n2)
+ bytes(reversed(n2)) + reverse(a1)
+ bytes(reversed(a1)) + reverse(a2)
+ bytes(reversed(a2)) + bytes([1, 0]),
+ bytes([1, 0]), t,
t,
)
) )
), ),
bytes( reverse(
reversed( aes_cmac(
aes_cmac( bytes([1])
bytes([1]) + key_id
+ key_id + reverse(n1)
+ bytes(reversed(n1)) + reverse(n2)
+ bytes(reversed(n2)) + reverse(a1)
+ bytes(reversed(a1)) + reverse(a2)
+ bytes(reversed(a2)) + bytes([1, 0]),
+ bytes([1, 0]), t,
t,
)
) )
), ),
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name def f6(
w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes
) -> bytes: # pylint: disable=redefined-outer-name
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
Generation Function f6 Generation Function f6
''' '''
return bytes( return reverse(
reversed( aes_cmac(
aes_cmac( reverse(n1)
bytes(reversed(n1)) + reverse(n2)
+ bytes(reversed(n2)) + reverse(r)
+ bytes(reversed(r)) + reverse(io_cap)
+ bytes(reversed(io_cap)) + reverse(a1)
+ bytes(reversed(a1)) + reverse(a2),
+ bytes(reversed(a2)), reverse(w),
bytes(reversed(w)),
)
) )
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def g2(u, v, x, y): def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
Value Generation Function g2 Value Generation Function g2
''' '''
return int.from_bytes( return int.from_bytes(
aes_cmac( aes_cmac(
bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)), reverse(u) + reverse(v) + reverse(y),
bytes(reversed(x)), reverse(x),
)[-4:], )[-4:],
byteorder='big', byteorder='big',
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def h6(w, key_id): def h6(w: bytes, key_id: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6 See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
''' '''
return aes_cmac(key_id, w) return reverse(aes_cmac(key_id, reverse(w)))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def h7(salt, w): def h7(salt: bytes, w: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7 See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
''' '''
return aes_cmac(w, salt) return reverse(aes_cmac(reverse(w), salt))
+577 -48
View File
@@ -21,8 +21,9 @@ import functools
import json import json
import asyncio import asyncio
import logging import logging
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack, closing
from dataclasses import dataclass from dataclasses import dataclass
from collections.abc import Iterable
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@@ -32,6 +33,8 @@ from typing import (
Optional, Optional,
Tuple, Tuple,
Type, Type,
TypeVar,
Set,
Union, Union,
cast, cast,
overload, overload,
@@ -46,6 +49,7 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE, HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING, HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY, HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY, HCI_DISPLAY_ONLY_IO_CAPABILITY,
@@ -82,31 +86,44 @@ from .hci import (
HCI_Constant, HCI_Constant,
HCI_Create_Connection_Cancel_Command, HCI_Create_Connection_Cancel_Command,
HCI_Create_Connection_Command, HCI_Create_Connection_Command,
HCI_Create_Connection_Command,
HCI_Disconnect_Command, HCI_Disconnect_Command,
HCI_Encryption_Change_Event, HCI_Encryption_Change_Event,
HCI_Error, HCI_Error,
HCI_IO_Capability_Request_Reply_Command, HCI_IO_Capability_Request_Reply_Command,
HCI_Inquiry_Cancel_Command, HCI_Inquiry_Cancel_Command,
HCI_Inquiry_Command, HCI_Inquiry_Command,
HCI_IsoDataPacket,
HCI_LE_Accept_CIS_Request_Command,
HCI_LE_Add_Device_To_Resolving_List_Command, HCI_LE_Add_Device_To_Resolving_List_Command,
HCI_LE_Advertising_Report_Event, HCI_LE_Advertising_Report_Event,
HCI_LE_Clear_Resolving_List_Command, HCI_LE_Clear_Resolving_List_Command,
HCI_LE_Connection_Update_Command, HCI_LE_Connection_Update_Command,
HCI_LE_Create_Connection_Cancel_Command, HCI_LE_Create_Connection_Cancel_Command,
HCI_LE_Create_Connection_Command, HCI_LE_Create_Connection_Command,
HCI_LE_Create_CIS_Command,
HCI_LE_Enable_Encryption_Command, HCI_LE_Enable_Encryption_Command,
HCI_LE_Extended_Advertising_Report_Event, HCI_LE_Extended_Advertising_Report_Event,
HCI_LE_Extended_Create_Connection_Command, HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command, HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command, HCI_LE_Read_PHY_Command,
HCI_LE_Reject_CIS_Request_Command,
HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command, HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Advertising_Set_Random_Address_Command,
HCI_LE_Set_CIG_Parameters_Command,
HCI_LE_Set_Data_Length_Command, HCI_LE_Set_Data_Length_Command,
HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command, HCI_LE_Set_Extended_Scan_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command, HCI_LE_Set_Extended_Scan_Parameters_Command,
HCI_LE_Set_Extended_Scan_Response_Data_Command,
HCI_LE_Set_Extended_Advertising_Data_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
HCI_LE_Set_Host_Feature_Command,
HCI_LE_Set_PHY_Command, HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command, HCI_LE_Set_Scan_Enable_Command,
@@ -121,6 +138,7 @@ from .hci import (
HCI_Switch_Role_Command, HCI_Switch_Role_Command,
HCI_Set_Connection_Encryption_Command, HCI_Set_Connection_Encryption_Command,
HCI_StatusError, HCI_StatusError,
HCI_SynchronousDataPacket,
HCI_User_Confirmation_Request_Negative_Reply_Command, HCI_User_Confirmation_Request_Negative_Reply_Command,
HCI_User_Confirmation_Request_Reply_Command, HCI_User_Confirmation_Request_Reply_Command,
HCI_User_Passkey_Request_Negative_Reply_Command, HCI_User_Passkey_Request_Negative_Reply_Command,
@@ -152,9 +170,11 @@ from .core import (
from .utils import ( from .utils import (
AsyncRunner, AsyncRunner,
CompositeEventEmitter, CompositeEventEmitter,
EventWatcher,
setup_event_forwarding, setup_event_forwarding,
composite_listener, composite_listener,
deprecated, deprecated,
experimental,
) )
from .keys import ( from .keys import (
KeyStore, KeyStore,
@@ -189,6 +209,8 @@ DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240 DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127 DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20 DEVICE_MAX_LE_RSSI = 20
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -430,8 +452,11 @@ class LePhyOptions:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy)
class Peer: class Peer:
def __init__(self, connection): def __init__(self, connection: Connection) -> None:
self.connection = connection self.connection = connection
# Create a GATT client for the connection # Create a GATT client for the connection
@@ -439,77 +464,113 @@ class Peer:
connection.gatt_client = self.gatt_client connection.gatt_client = self.gatt_client
@property @property
def services(self): def services(self) -> List[gatt_client.ServiceProxy]:
return self.gatt_client.services return self.gatt_client.services
async def request_mtu(self, mtu): async def request_mtu(self, mtu: int) -> int:
mtu = await self.gatt_client.request_mtu(mtu) mtu = await self.gatt_client.request_mtu(mtu)
self.connection.emit('connection_att_mtu_update') self.connection.emit('connection_att_mtu_update')
return mtu return mtu
async def discover_service(self, uuid): async def discover_service(
self, uuid: Union[core.UUID, str]
) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_service(uuid) return await self.gatt_client.discover_service(uuid)
async def discover_services(self, uuids=()): async def discover_services(
self, uuids: Iterable[core.UUID] = ()
) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_services(uuids) return await self.gatt_client.discover_services(uuids)
async def discover_included_services(self, service): async def discover_included_services(
self, service: gatt_client.ServiceProxy
) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_included_services(service) return await self.gatt_client.discover_included_services(service)
async def discover_characteristics(self, uuids=(), service=None): async def discover_characteristics(
self,
uuids: Iterable[Union[core.UUID, str]] = (),
service: Optional[gatt_client.ServiceProxy] = None,
) -> List[gatt_client.CharacteristicProxy]:
return await self.gatt_client.discover_characteristics( return await self.gatt_client.discover_characteristics(
uuids=uuids, service=service uuids=uuids, service=service
) )
async def discover_descriptors( async def discover_descriptors(
self, characteristic=None, start_handle=None, end_handle=None self,
characteristic: Optional[gatt_client.CharacteristicProxy] = None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
): ):
return await self.gatt_client.discover_descriptors( return await self.gatt_client.discover_descriptors(
characteristic, start_handle, end_handle characteristic, start_handle, end_handle
) )
async def discover_attributes(self): async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
return await self.gatt_client.discover_attributes() return await self.gatt_client.discover_attributes()
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True): async def subscribe(
self,
characteristic: gatt_client.CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
return await self.gatt_client.subscribe( return await self.gatt_client.subscribe(
characteristic, subscriber, prefer_notify characteristic, subscriber, prefer_notify
) )
async def unsubscribe(self, characteristic, subscriber=None): async def unsubscribe(
self,
characteristic: gatt_client.CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
) -> None:
return await self.gatt_client.unsubscribe(characteristic, subscriber) return await self.gatt_client.unsubscribe(characteristic, subscriber)
async def read_value(self, attribute): async def read_value(
self, attribute: Union[int, gatt_client.AttributeProxy]
) -> bytes:
return await self.gatt_client.read_value(attribute) return await self.gatt_client.read_value(attribute)
async def write_value(self, attribute, value, with_response=False): async def write_value(
self,
attribute: Union[int, gatt_client.AttributeProxy],
value: bytes,
with_response: bool = False,
) -> None:
return await self.gatt_client.write_value(attribute, value, with_response) return await self.gatt_client.write_value(attribute, value, with_response)
async def read_characteristics_by_uuid(self, uuid, service=None): async def read_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[bytes]:
return await self.gatt_client.read_characteristics_by_uuid(uuid, service) return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
def get_services_by_uuid(self, uuid): def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
return self.gatt_client.get_services_by_uuid(uuid) return self.gatt_client.get_services_by_uuid(uuid)
def get_characteristics_by_uuid(self, uuid, service=None): def get_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[gatt_client.CharacteristicProxy]:
return self.gatt_client.get_characteristics_by_uuid(uuid, service) return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(self, proxy_class): def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS:
return proxy_class.from_client(self.gatt_client) return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client))
async def discover_service_and_create_proxy(self, proxy_class): async def discover_service_and_create_proxy(
self, proxy_class: Type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
# Discover the first matching service and its characteristics # Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID) services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
if services: if services:
service = services[0] service = services[0]
await service.discover_characteristics() await service.discover_characteristics()
return self.create_service_proxy(proxy_class) return self.create_service_proxy(proxy_class)
return None
async def sustain(self, timeout=None): async def sustain(self, timeout: Optional[float] = None) -> None:
await self.connection.sustain(timeout) await self.connection.sustain(timeout)
# [Classic only] # [Classic only]
async def request_name(self): async def request_name(self) -> str:
return await self.connection.request_remote_name() return await self.connection.request_remote_name()
async def __aenter__(self): async def __aenter__(self):
@@ -522,7 +583,7 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback): async def __aexit__(self, exc_type, exc_value, traceback):
pass pass
def __str__(self): def __str__(self) -> str:
return f'{self.connection.peer_address} as {self.connection.role_name}' return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -541,6 +602,46 @@ class ConnectionParametersPreferences:
ConnectionParametersPreferences.default = ConnectionParametersPreferences() ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
@dataclass
class ScoLink(CompositeEventEmitter):
device: Device
acl_connection: Connection
handle: int
link_type: int
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
@dataclass
class CisLink(CompositeEventEmitter):
class State(IntEnum):
PENDING = 0
ESTABLISHED = 1
device: Device
acl_connection: Connection # Based ACL connection
handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
state: State = State.PENDING
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter): class Connection(CompositeEventEmitter):
device: Device device: Device
@@ -722,7 +823,7 @@ class Connection(CompositeEventEmitter):
async def switch_role(self, role: int) -> None: async def switch_role(self, role: int) -> None:
return await self.device.switch_role(self, role) return await self.device.switch_role(self, role)
async def sustain(self, timeout=None): async def sustain(self, timeout: Optional[float] = None) -> None:
"""Idles the current task waiting for a disconnect or timeout""" """Idles the current task waiting for a disconnect or timeout"""
abort = asyncio.get_running_loop().create_future() abort = asyncio.get_running_loop().create_future()
@@ -819,6 +920,7 @@ class DeviceConfiguration:
self.keystore = None self.keystore = None
self.gatt_services: List[Dict[str, Any]] = [] self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False self.address_resolution_offload = False
self.cis_enabled = False
def load_from_dict(self, config: Dict[str, Any]) -> None: def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties # Load simple properties
@@ -854,6 +956,7 @@ class DeviceConfiguration:
self.address_resolution_offload = config.get( self.address_resolution_offload = config.get(
'address_resolution_offload', self.address_resolution_offload 'address_resolution_offload', self.address_resolution_offload
) )
self.cis_enabled = config.get('cis_enabled', self.cis_enabled)
# Load or synthesize an IRK # Load or synthesize an IRK
irk = config.get('irk') irk = config.get('irk')
@@ -960,6 +1063,10 @@ class Device(CompositeEventEmitter):
] ]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration config: DeviceConfiguration
extended_advertising_handles: Set[int]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
@composite_listener @composite_listener
class Listener: class Listener:
@@ -1052,12 +1159,16 @@ class Device(CompositeEventEmitter):
self.disconnecting = False self.disconnecting = False
self.connections = {} # Connections, by connection handle self.connections = {} # Connections, by connection handle
self.pending_connections = {} # Connections, by BD address (BR/EDR only) self.pending_connections = {} # Connections, by BD address (BR/EDR only)
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.classic_enabled = False self.classic_enabled = False
self.inquiry_response = None self.inquiry_response = None
self.address_resolver = None self.address_resolver = None
self.classic_pending_accepts = { self.classic_pending_accepts = {
Address.ANY: [] Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY } # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
# Own address type cache # Own address type cache
self.advertising_own_address_type = None self.advertising_own_address_type = None
@@ -1080,6 +1191,7 @@ class Device(CompositeEventEmitter):
self.le_enabled = config.le_enabled self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled self.classic_smp_enabled = config.classic_smp_enabled
@@ -1390,6 +1502,16 @@ class Device(CompositeEventEmitter):
) # type: ignore[call-arg] ) # type: ignore[call-arg]
) )
if self.cis_enabled:
await self.send_command(
HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg]
bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
bit_value=1,
)
)
if self.classic_enabled: if self.classic_enabled:
await self.send_command( await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg] HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
@@ -1536,6 +1658,149 @@ class Device(CompositeEventEmitter):
self.advertising = False self.advertising = False
self.auto_restart_advertising = False self.auto_restart_advertising = False
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
advertising_data: Optional[bytes] = None,
) -> int:
"""Starts an extended advertising set.
Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
Returns:
Handle of the new advertising set.
"""
adv_handle = -1
# Find a free handle
for i in range(
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
adv_handle = i
break
if adv_handle == -1:
raise InvalidStateError('No available advertising set.')
try:
# Set the advertising parameters
await self.send_command(
HCI_LE_Set_Extended_Advertising_Parameters_Command(
advertising_handle=adv_handle,
advertising_event_properties=advertising_properties,
primary_advertising_interval_min=self.advertising_interval_min,
primary_advertising_interval_max=self.advertising_interval_max,
primary_advertising_channel_map=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39
),
own_address_type=own_address_type,
peer_address_type=target.address_type,
peer_address=target,
advertising_tx_power=7,
advertising_filter_policy=0,
primary_advertising_phy=1, # LE 1M
secondary_advertising_max_skip=0,
secondary_advertising_phy=1, # LE 1M
advertising_sid=0,
scan_request_notification_enable=0,
), # type: ignore[call-arg]
check_result=True,
)
# Set the advertising data if present
if advertising_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Advertising_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data,
), # type: ignore[call-arg]
check_result=True,
)
# Set the scan response if present
if scan_response is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
), # type: ignore[call-arg]
check_result=True,
)
if own_address_type in (
OwnAddressType.RANDOM,
OwnAddressType.RESOLVABLE_OR_RANDOM,
):
await self.send_command(
HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle,
random_address=self.random_address,
), # type: ignore[call-arg]
check_result=True,
)
# Enable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=1,
advertising_handles=[adv_handle],
durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg]
check_result=True,
)
except HCI_Error as error:
# When any step fails, cleanup the advertising handle.
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=False,
)
raise error
self.extended_advertising_handles.add(adv_handle)
return adv_handle
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
"""Stops an extended advertising set.
Args:
adv_handle: Handle of the advertising set to stop.
"""
# Disable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=0,
advertising_handles=[adv_handle],
durations=[0],
max_extended_advertising_events=[0],
), # type: ignore[call-arg]
check_result=True,
)
# Remove advertising set
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
@property @property
def is_advertising(self): def is_advertising(self):
return self.advertising return self.advertising
@@ -2170,7 +2435,9 @@ class Device(CompositeEventEmitter):
check_result=True, check_result=True,
) )
async def disconnect(self, connection, reason): async def disconnect(
self, connection: Union[Connection, ScoLink, CisLink], reason: int
) -> None:
# Create a future so that we can wait for the disconnection's result # Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future() pending_disconnection = asyncio.get_running_loop().create_future()
connection.on('disconnection', pending_disconnection.set_result) connection.on('disconnection', pending_disconnection.set_result)
@@ -2178,7 +2445,7 @@ class Device(CompositeEventEmitter):
# Request a disconnection # Request a disconnection
result = await self.send_command( result = await self.send_command(
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg]
) )
try: try:
@@ -2641,6 +2908,154 @@ class Device(CompositeEventEmitter):
self.remove_listener('remote_name', handler) self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler) self.remove_listener('remote_name_failure', failure_handler)
# [LE only]
@experimental('Only for testing.')
async def setup_cig(
self,
cig_id: int,
cis_id: List[int],
sdu_interval: Tuple[int, int],
framing: int,
max_sdu: Tuple[int, int],
retransmission_number: int,
max_transport_latency: Tuple[int, int],
) -> List[int]:
"""Sends HCI_LE_Set_CIG_Parameters_Command.
Args:
cig_id: CIG_ID.
cis_id: CID ID list.
sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
framing: Un-framing(0) or Framing(1).
max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
retransmission_number: retransmission_number.
max_transport_latency: Max transport latencies of
(Central->Peripheral, Peripheral->Cental).
Returns:
List of created CIS handles corresponding to the same order of [cid_id].
"""
num_cis = len(cis_id)
response = await self.send_command(
HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg]
cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1],
worst_case_sca=0x00, # 251-500 ppm
packing=0x00, # Sequential
framing=framing,
max_transport_latency_c_to_p=max_transport_latency[0],
max_transport_latency_p_to_c=max_transport_latency[1],
cis_id=cis_id,
max_sdu_c_to_p=[max_sdu[0]] * num_cis,
max_sdu_p_to_c=[max_sdu[1]] * num_cis,
phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
rtn_c_to_p=[retransmission_number] * num_cis,
rtn_p_to_c=[retransmission_number] * num_cis,
),
check_result=True,
)
# Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
# Server, so here it only provides a basic functionality for testing.
cis_handles = response.return_parameters.connection_handle[:]
for id, cis_handle in zip(cis_id, cis_handles):
self._pending_cis[cis_handle] = (id, cig_id)
return cis_handles
# [LE only]
@experimental('Only for testing.')
async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
for cis_handle, acl_handle in cis_acl_pairs:
acl_connection = self.lookup_connection(acl_handle)
assert acl_connection
cis_id, cig_id = self._pending_cis.pop(cis_handle)
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cis_id=cis_id,
cig_id=cig_id,
)
result = await self.send_command(
HCI_LE_Create_CIS_Command( # type: ignore[call-arg]
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Create_CIS_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishments: Dict[int, asyncio.Future[CisLink]] = {}
for cis_handle, _ in cis_acl_pairs:
pending_cis_establishments[
cis_handle
] = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if pending_future := pending_cis_establishments.get(
cis_link.handle, None
):
pending_future.set_result(cis_link)
return await asyncio.gather(*pending_cis_establishments.values())
# [LE only]
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishment = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)
return await pending_cis_establishment
# [LE only]
@experimental('Only for testing.')
async def reject_cis_request(
self,
handle: int,
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
result = await self.send_command(
HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle, reason=reason
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Reject_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
@host_event_handler @host_event_handler
def on_flush(self): def on_flush(self):
self.emit('flush') self.emit('flush')
@@ -2845,30 +3260,35 @@ class Device(CompositeEventEmitter):
) )
@host_event_handler @host_event_handler
@with_connection_from_handle def on_disconnection(self, connection_handle: int, reason: int) -> None:
def on_disconnection(self, connection, reason): if connection := self.connections.pop(connection_handle, None):
logger.debug( logger.debug(
f'*** Disconnection: [0x{connection.handle:04X}] ' f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}' f'{connection.peer_address} as {connection.role_name}, reason={reason}'
) )
connection.emit('disconnection', reason) connection.emit('disconnection', reason)
# Remove the connection from the map # Cleanup subsystems that maintain per-connection state
del self.connections[connection.handle] self.gatt_server.on_disconnection(connection)
# Cleanup subsystems that maintain per-connection state # Restart advertising if auto-restart is enabled
self.gatt_server.on_disconnection(connection) if self.auto_restart_advertising:
logger.debug('restarting advertising')
# Restart advertising if auto-restart is enabled self.abort_on(
if self.auto_restart_advertising: 'flush',
logger.debug('restarting advertising') self.start_advertising(
self.abort_on( advertising_type=self.advertising_type, # type: ignore[arg-type]
'flush', own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
self.start_advertising( auto_restart=True,
advertising_type=self.advertising_type, ),
own_address_type=self.advertising_own_address_type, )
auto_restart=True, elif sco_link := self.sco_links.pop(connection_handle, None):
), sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
cis_link.emit('disconnection', reason)
else:
logger.error(
f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
) )
@host_event_handler @host_event_handler
@@ -3147,6 +3567,107 @@ class Device(CompositeEventEmitter):
connection.emit('remote_name_failure', error) connection.emit('remote_name_failure', error)
self.emit('remote_name_failure', address, error) self.emit('remote_name_failure', address, error)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection(
self, acl_connection: Connection, sco_handle: int, link_type: int
) -> None:
logger.debug(
f'*** SCO connected: {acl_connection.peer_address}, '
f'sco_handle=[0x{sco_handle:04X}], '
f'link_type=[0x{link_type:02X}] ***'
)
sco_link = self.sco_links[sco_handle] = ScoLink(
device=self,
acl_connection=acl_connection,
handle=sco_handle,
link_type=link_type,
)
self.emit('sco_connection', sco_link)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection_failure(
self, acl_connection: Connection, status: int
) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
self.emit('sco_connection_failure')
# [Classic only]
@host_event_handler
@experimental('Only for testing')
def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@with_connection_from_handle
@experimental('Only for testing')
def on_cis_request(
self,
acl_connection: Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'*** CIS Request '
f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cig_id:02X}], '
f'cis_id=[0x{cis_id:02X}] ***'
)
# LE_CIS_Established event doesn't provide info, so we must store them here.
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cig_id=cig_id,
cis_id=cis_id,
)
self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment(self, cis_handle: int) -> None:
cis_link = self.cis_links[cis_handle]
cis_link.state = CisLink.State.ESTABLISHED
assert cis_link.acl_connection
logger.debug(
f'*** CIS Establishment '
f'{cis_link.acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cis_link.cig_id:02X}], '
f'cis_id=[0x{cis_link.cis_id:02X}] ***'
)
cis_link.emit('establishment')
self.emit('cis_establishment', cis_link)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle, None):
cis_link.emit('establishment_failure')
self.emit('cis_establishment_failure', cis_handle, status)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
if cis_link := self.cis_links.get(handle, None):
cis_link.emit('pdu', packet)
@host_event_handler @host_event_handler
@with_connection_from_handle @with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption): def on_connection_encryption_change(self, connection, encryption):
@@ -3158,10 +3679,18 @@ class Device(CompositeEventEmitter):
connection.encryption = encryption connection.encryption = encryption
if ( if (
not connection.authenticated not connection.authenticated
and connection.transport == BT_BR_EDR_TRANSPORT
and encryption == HCI_Encryption_Change_Event.AES_CCM and encryption == HCI_Encryption_Change_Event.AES_CCM
): ):
connection.authenticated = True connection.authenticated = True
connection.sc = True connection.sc = True
if (
not connection.authenticated
and connection.transport == BT_LE_TRANSPORT
and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM
):
connection.authenticated = True
connection.sc = True
connection.emit('connection_encryption_change') connection.emit('connection_encryption_change')
@host_event_handler @host_event_handler
+116 -6
View File
@@ -93,20 +93,35 @@ GATT_RECONNECTION_CONFIGURATION_SERVICE = UUID.from_16_bits(0x1829, 'Reconne
GATT_INSULIN_DELIVERY_SERVICE = UUID.from_16_bits(0x183A, 'Insulin Delivery') GATT_INSULIN_DELIVERY_SERVICE = UUID.from_16_bits(0x183A, 'Insulin Delivery')
GATT_BINARY_SENSOR_SERVICE = UUID.from_16_bits(0x183B, 'Binary Sensor') GATT_BINARY_SENSOR_SERVICE = UUID.from_16_bits(0x183B, 'Binary Sensor')
GATT_EMERGENCY_CONFIGURATION_SERVICE = UUID.from_16_bits(0x183C, 'Emergency Configuration') GATT_EMERGENCY_CONFIGURATION_SERVICE = UUID.from_16_bits(0x183C, 'Emergency Configuration')
GATT_AUTHORIZATION_CONTROL_SERVICE = UUID.from_16_bits(0x183D, 'Authorization Control')
GATT_PHYSICAL_ACTIVITY_MONITOR_SERVICE = UUID.from_16_bits(0x183E, 'Physical Activity Monitor') GATT_PHYSICAL_ACTIVITY_MONITOR_SERVICE = UUID.from_16_bits(0x183E, 'Physical Activity Monitor')
GATT_ELAPSED_TIME_SERVICE = UUID.from_16_bits(0x183F, 'Elapsed Time')
GATT_GENERIC_HEALTH_SENSOR_SERVICE = UUID.from_16_bits(0x1840, 'Generic Health Sensor')
GATT_AUDIO_INPUT_CONTROL_SERVICE = UUID.from_16_bits(0x1843, 'Audio Input Control') GATT_AUDIO_INPUT_CONTROL_SERVICE = UUID.from_16_bits(0x1843, 'Audio Input Control')
GATT_VOLUME_CONTROL_SERVICE = UUID.from_16_bits(0x1844, 'Volume Control') GATT_VOLUME_CONTROL_SERVICE = UUID.from_16_bits(0x1844, 'Volume Control')
GATT_VOLUME_OFFSET_CONTROL_SERVICE = UUID.from_16_bits(0x1845, 'Volume Offset Control') GATT_VOLUME_OFFSET_CONTROL_SERVICE = UUID.from_16_bits(0x1845, 'Volume Offset Control')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification Service') GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification')
GATT_DEVICE_TIME_SERVICE = UUID.from_16_bits(0x1847, 'Device Time') GATT_DEVICE_TIME_SERVICE = UUID.from_16_bits(0x1847, 'Device Time')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control Service') GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control Service') GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control')
GATT_CONSTANT_TONE_EXTENSION_SERVICE = UUID.from_16_bits(0x184A, 'Constant Tone Extension') GATT_CONSTANT_TONE_EXTENSION_SERVICE = UUID.from_16_bits(0x184A, 'Constant Tone Extension')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer Service') GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer Service') GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer')
GATT_MICROPHONE_CONTROL_SERVICE = UUID.from_16_bits(0x184D, 'Microphone Control') GATT_MICROPHONE_CONTROL_SERVICE = UUID.from_16_bits(0x184D, 'Microphone Control')
GATT_AUDIO_STREAM_CONTROL_SERVICE = UUID.from_16_bits(0x184E, 'Audio Stream Control')
GATT_BROADCAST_AUDIO_SCAN_SERVICE = UUID.from_16_bits(0x184F, 'Broadcast Audio Scan')
GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE = UUID.from_16_bits(0x1850, 'Published Audio Capabilities')
GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1851, 'Basic Audio Announcement')
GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1852, 'Broadcast Audio Announcement')
GATT_COMMON_AUDIO_SERVICE = UUID.from_16_bits(0x1853, 'Common Audio')
GATT_HEARING_ACCESS_SERVICE = UUID.from_16_bits(0x1854, 'Hearing Access')
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE = UUID.from_16_bits(0x1855, 'Telephony and Media Audio')
GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1856, 'Public Broadcast Announcement')
GATT_ELECTRONIC_SHELF_LABEL_SERVICE = UUID.from_16_bits(0X1857, 'Electronic Shelf Label')
GATT_GAMING_AUDIO_SERVICE = UUID.from_16_bits(0x1858, 'Gaming Audio')
GATT_MESH_PROXY_SOLICITATION_SERVICE = UUID.from_16_bits(0x1859, 'Mesh Audio Solicitation')
# Types # Attribute Types
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2800, 'Primary Service') GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2800, 'Primary Service')
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2801, 'Secondary Service') GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2801, 'Secondary Service')
GATT_INCLUDE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2802, 'Include') GATT_INCLUDE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2802, 'Include')
@@ -129,6 +144,8 @@ GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C,
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting') GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting') GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data') GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
GATT_OBSERVATION_SCHEDULE_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Observation Schedule')
GATT_VALID_RANGE_AND_ACCURACY_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Valid Range And Accuracy')
# Device Information Service # Device Information Service
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID') GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
@@ -156,6 +173,96 @@ GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart
# Battery Service # Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level') GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Telephony And Media Audio Service (TMAS)
GATT_TMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2B51, 'TMAP Role')
# Audio Input Control Service (AICS)
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B77, 'Audio Input State')
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC = UUID.from_16_bits(0x2B78, 'Gain Settings Attribute')
GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC = UUID.from_16_bits(0x2B79, 'Audio Input Type')
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC = UUID.from_16_bits(0x2B7A, 'Audio Input Status')
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7B, 'Audio Input Control Point')
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B7C, 'Audio Input Description')
# Volume Control Service (VCS)
GATT_VOLUME_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B7D, 'Volume State')
GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7E, 'Volume Control Point')
GATT_VOLUME_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2B7F, 'Volume Flags')
# Volume Offset Control Service (VOCS)
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B80, 'Volume Offset State')
GATT_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2B81, 'Audio Location')
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B82, 'Volume Offset Control Point')
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B83, 'Audio Output Description')
# Coordinated Set Identification Service (CSIS)
GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC = UUID.from_16_bits(0x2B84, 'Set Identity Resolving Key')
GATT_COORDINATED_SET_SIZE_CHARACTERISTIC = UUID.from_16_bits(0x2B85, 'Coordinated Set Size')
GATT_SET_MEMBER_LOCK_CHARACTERISTIC = UUID.from_16_bits(0x2B86, 'Set Member Lock')
GATT_SET_MEMBER_RANK_CHARACTERISTIC = UUID.from_16_bits(0x2B87, 'Set Member Rank')
# Media Control Service (MCS)
GATT_MEDIA_PLAYER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2B93, 'Media Player Name')
GATT_MEDIA_PLAYER_ICON_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B94, 'Media Player Icon Object ID')
GATT_MEDIA_PLAYER_ICON_URL_CHARACTERISTIC = UUID.from_16_bits(0x2B95, 'Media Player Icon URL')
GATT_TRACK_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2B96, 'Track Changed')
GATT_TRACK_TITLE_CHARACTERISTIC = UUID.from_16_bits(0x2B97, 'Track Title')
GATT_TRACK_DURATION_CHARACTERISTIC = UUID.from_16_bits(0x2B98, 'Track Duration')
GATT_TRACK_POSITION_CHARACTERISTIC = UUID.from_16_bits(0x2B99, 'Track Position')
GATT_PLAYBACK_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9A, 'Playback Speed')
GATT_SEEKING_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9B, 'Seeking Speed')
GATT_CURRENT_TRACK_SEGMENTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9C, 'Current Track Segments Object ID')
GATT_CURRENT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9D, 'Current Track Object ID')
GATT_NEXT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9E, 'Next Track Object ID')
GATT_PARENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9F, 'Parent Group Object ID')
GATT_CURRENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA0, 'Current Group Object ID')
GATT_PLAYING_ORDER_CHARACTERISTIC = UUID.from_16_bits(0x2BA1, 'Playing Order')
GATT_PLAYING_ORDERS_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA2, 'Playing Orders Supported')
GATT_MEDIA_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BA3, 'Media State')
GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA4, 'Media Control Point')
GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA5, 'Media Control Point Opcodes Supported')
GATT_SEARCH_RESULTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA6, 'Search Results Object ID')
GATT_SEARCH_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA7, 'Search Control Point')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Content Control Id')
# Telephone Bearer Service (TBS)
GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BB4, 'Bearer Provider Name')
GATT_BEARER_UCI_CHARACTERISTIC = UUID.from_16_bits(0x2BB5, 'Bearer UCI')
GATT_BEARER_TECHNOLOGY_CHARACTERISTIC = UUID.from_16_bits(0x2BB6, 'Bearer Technology')
GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2BB7, 'Bearer URI Schemes Supported List')
GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength')
GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer Signal Strength Reporting Interval')
GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Bearer List Current Calls')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBB, 'Content Control ID')
GATT_STATUS_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2BBC, 'Status Flags')
GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC = UUID.from_16_bits(0x2BBD, 'Incoming Call Target Bearer URI')
GATT_CALL_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BBE, 'Call State')
GATT_CALL_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BBF, 'Call Control Point')
GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC = UUID.from_16_bits(0x2BC0, 'Call Control Point Optional Opcodes')
GATT_TERMINATION_REASON_CHARACTERISTIC = UUID.from_16_bits(0x2BC1, 'Termination Reason')
GATT_INCOMING_CALL_CHARACTERISTIC = UUID.from_16_bits(0x2BC2, 'Incoming Call')
GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Call Friendly Name')
# Microphone Control Service (MICS)
GATT_MUTE_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Mute')
# Audio Stream Control Service (ASCS)
GATT_SINK_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC4, 'Sink ASE')
GATT_SOURCE_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC5, 'Source ASE')
GATT_ASE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC6, 'ASE Control Point')
# Broadcast Audio Scan Service (BASS)
GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC7, 'Broadcast Audio Scan Control Point')
GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BC8, 'Broadcast Receive State')
# Published Audio Capabilities Service (PACS)
GATT_SINK_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BC9, 'Sink PAC')
GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCA, 'Sink Audio Location')
GATT_SOURCE_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BCB, 'Source PAC')
GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Source Audio Location')
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
# ASHA Service # ASHA Service
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid') GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties') GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
@@ -177,6 +284,9 @@ GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bi
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time') GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report') GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution') GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features')
GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash')
GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
# fmt: on # fmt: on
# pylint: enable=line-too-long # pylint: enable=line-too-long
+56 -20
View File
@@ -38,6 +38,7 @@ from typing import (
Any, Any,
Iterable, Iterable,
Type, Type,
Set,
TYPE_CHECKING, TYPE_CHECKING,
) )
@@ -128,7 +129,7 @@ class ServiceProxy(AttributeProxy):
included_services: List[ServiceProxy] included_services: List[ServiceProxy]
@staticmethod @staticmethod
def from_client(service_class, client, service_uuid): def from_client(service_class, client: Client, service_uuid: UUID):
# The service and its characteristics are considered to have already been # The service and its characteristics are considered to have already been
# discovered # discovered
services = client.get_services_by_uuid(service_uuid) services = client.get_services_by_uuid(service_uuid)
@@ -206,11 +207,11 @@ class CharacteristicProxy(AttributeProxy):
return await self.client.subscribe(self, subscriber, prefer_notify) return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None): async def unsubscribe(self, subscriber=None, force=False):
if subscriber in self.subscribers: if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber) subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber) return await self.client.unsubscribe(self, subscriber, force)
def __str__(self) -> str: def __str__(self) -> str:
return ( return (
@@ -246,8 +247,12 @@ class ProfileServiceProxy:
class Client: class Client:
services: List[ServiceProxy] services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]] cached_values: Dict[int, Tuple[datetime, bytes]]
notification_subscribers: Dict[int, Callable[[bytes], Any]] notification_subscribers: Dict[
indication_subscribers: Dict[int, Callable[[bytes], Any]] int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
indication_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
pending_response: Optional[asyncio.futures.Future[ATT_PDU]] pending_response: Optional[asyncio.futures.Future[ATT_PDU]]
pending_request: Optional[ATT_PDU] pending_request: Optional[ATT_PDU]
@@ -257,10 +262,8 @@ class Client:
self.request_semaphore = asyncio.Semaphore(1) self.request_semaphore = asyncio.Semaphore(1)
self.pending_request = None self.pending_request = None
self.pending_response = None self.pending_response = None
self.notification_subscribers = ( self.notification_subscribers = {} # Subscriber set, by attribute handle
{} self.indication_subscribers = {} # Subscriber set, by attribute handle
) # Notification subscribers, by attribute handle
self.indication_subscribers = {} # Indication subscribers, by attribute handle
self.services = [] self.services = []
self.cached_values = {} self.cached_values = {}
@@ -682,8 +685,8 @@ class Client:
async def discover_descriptors( async def discover_descriptors(
self, self,
characteristic: Optional[CharacteristicProxy] = None, characteristic: Optional[CharacteristicProxy] = None,
start_handle=None, start_handle: Optional[int] = None,
end_handle=None, end_handle: Optional[int] = None,
) -> List[DescriptorProxy]: ) -> List[DescriptorProxy]:
''' '''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
@@ -789,7 +792,12 @@ class Client:
return attributes return attributes
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True): async def subscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
# If we haven't already discovered the descriptors for this characteristic, # If we haven't already discovered the descriptors for this characteristic,
# do it now # do it now
if not characteristic.descriptors_discovered: if not characteristic.descriptors_discovered:
@@ -826,6 +834,7 @@ class Client:
subscriber_set = subscribers.setdefault(characteristic.handle, set()) subscriber_set = subscribers.setdefault(characteristic.handle, set())
if subscriber is not None: if subscriber is not None:
subscriber_set.add(subscriber) subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the # Add the characteristic as a subscriber, which will result in the
# characteristic emitting an 'update' event when a notification or indication # characteristic emitting an 'update' event when a notification or indication
# is received # is received
@@ -833,7 +842,18 @@ class Client:
await self.write_value(cccd, struct.pack('<H', bits), with_response=True) await self.write_value(cccd, struct.pack('<H', bits), with_response=True)
async def unsubscribe(self, characteristic, subscriber=None): async def unsubscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
force: bool = False,
) -> None:
'''
Unsubscribe from a characteristic.
If `force` is True, this will write zeros to the CCCD when there are no
subscribers left, even if there were already no registered subscribers.
'''
# If we haven't already discovered the descriptors for this characteristic, # If we haven't already discovered the descriptors for this characteristic,
# do it now # do it now
if not characteristic.descriptors_discovered: if not characteristic.descriptors_discovered:
@@ -847,31 +867,45 @@ class Client:
logger.warning('unsubscribing from characteristic with no CCCD descriptor') logger.warning('unsubscribing from characteristic with no CCCD descriptor')
return return
# Check if the characteristic has subscribers
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
if not force:
return
# Remove the subscriber(s)
if subscriber is not None: if subscriber is not None:
# Remove matching subscriber from subscriber sets # Remove matching subscriber from subscriber sets
for subscriber_set in ( for subscriber_set in (
self.notification_subscribers, self.notification_subscribers,
self.indication_subscribers, self.indication_subscribers,
): ):
subscribers = subscriber_set.get(characteristic.handle, []) if (
if subscriber in subscribers: subscribers := subscriber_set.get(characteristic.handle)
) and subscriber in subscribers:
subscribers.remove(subscriber) subscribers.remove(subscriber)
# Cleanup if we removed the last one # Cleanup if we removed the last one
if not subscribers: if not subscribers:
del subscriber_set[characteristic.handle] del subscriber_set[characteristic.handle]
else: else:
# Remove all subscribers for this attribute from the sets! # Remove all subscribers for this attribute from the sets
self.notification_subscribers.pop(characteristic.handle, None) self.notification_subscribers.pop(characteristic.handle, None)
self.indication_subscribers.pop(characteristic.handle, None) self.indication_subscribers.pop(characteristic.handle, None)
if not self.notification_subscribers and not self.indication_subscribers: # Update the CCCD
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
# No more subscribers left # No more subscribers left
await self.write_value(cccd, b'\x00\x00', with_response=True) await self.write_value(cccd, b'\x00\x00', with_response=True)
async def read_value( async def read_value(
self, attribute: Union[int, AttributeProxy], no_long_read: bool = False self, attribute: Union[int, AttributeProxy], no_long_read: bool = False
) -> Any: ) -> bytes:
''' '''
See Vol 3, Part G - 4.8.1 Read Characteristic Value See Vol 3, Part G - 4.8.1 Read Characteristic Value
@@ -1067,7 +1101,7 @@ class Client:
def on_att_handle_value_notification(self, notification): def on_att_handle_value_notification(self, notification):
# Call all subscribers # Call all subscribers
subscribers = self.notification_subscribers.get( subscribers = self.notification_subscribers.get(
notification.attribute_handle, [] notification.attribute_handle, set()
) )
if not subscribers: if not subscribers:
logger.warning('!!! received notification with no subscriber') logger.warning('!!! received notification with no subscriber')
@@ -1081,7 +1115,9 @@ class Client:
def on_att_handle_value_indication(self, indication): def on_att_handle_value_indication(self, indication):
# Call all subscribers # Call all subscribers
subscribers = self.indication_subscribers.get(indication.attribute_handle, []) subscribers = self.indication_subscribers.get(
indication.attribute_handle, set()
)
if not subscribers: if not subscribers:
logger.warning('!!! received indication with no subscriber') logger.warning('!!! received indication with no subscriber')
+113 -99
View File
@@ -17,6 +17,7 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import collections import collections
import dataclasses
import enum import enum
import functools import functools
import logging import logging
@@ -1382,6 +1383,45 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = {
STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)} STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)}
class CodecID(enum.IntEnum):
# fmt: off
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
LINEAR_PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
VENDOR_SPECIFIC = 0xFF
@dataclasses.dataclass(frozen=True)
class CodingFormat:
codec_id: CodecID
company_id: int = 0
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
return offset + 5, cls(
codec_id=CodecID(codec_id),
company_id=company_id,
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Constant: class HCI_Constant:
@staticmethod @staticmethod
@@ -1477,6 +1517,12 @@ class HCI_Object:
# The rest of the bytes # The rest of the bytes
field_value = data[offset:] field_value = data[offset:]
return (field_value, len(field_value)) return (field_value, len(field_value))
if field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_length = data[offset]
offset += 1
field_value = data[offset : offset + field_length]
return (field_value, field_length + 1)
if field_type == 1: if field_type == 1:
# 8-bit unsigned # 8-bit unsigned
return (data[offset], 1) return (data[offset], 1)
@@ -1581,6 +1627,11 @@ class HCI_Object:
raise ValueError('value too large for *-typed field') raise ValueError('value too large for *-typed field')
else: else:
field_bytes = bytes(field_value) field_bytes = bytes(field_value)
elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_bytes)
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr( elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes' field_value, 'to_bytes'
): ):
@@ -1888,6 +1939,7 @@ Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS
Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS) Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS)
Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS) Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class OwnAddressType: class OwnAddressType:
PUBLIC = 0 PUBLIC = 0
@@ -2445,14 +2497,14 @@ class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
('connection_handle', 2), ('connection_handle', 2),
('transmit_bandwidth', 4), ('transmit_bandwidth', 4),
('receive_bandwidth', 4), ('receive_bandwidth', 4),
('transmit_coding_format', 5), ('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', 5), ('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2), ('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2), ('receive_codec_frame_size', 2),
('input_bandwidth', 4), ('input_bandwidth', 4),
('output_bandwidth', 4), ('output_bandwidth', 4),
('input_coding_format', 5), ('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', 5), ('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2), ('input_coded_data_size', 2),
('output_coded_data_size', 2), ('output_coded_data_size', 2),
('input_pcm_data_format', 1), ('input_pcm_data_format', 1),
@@ -2473,22 +2525,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command
''' '''
class CodingFormat(enum.IntEnum):
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
def to_bytes(self):
return self.value.to_bytes(5, 'little')
def __bytes__(self):
return self.to_bytes()
class PcmDataFormat(enum.IntEnum): class PcmDataFormat(enum.IntEnum):
NA = 0x00 NA = 0x00
ONES_COMPLEMENT = 0x01 ONES_COMPLEMENT = 0x01
@@ -2525,14 +2561,14 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
('bd_addr', Address.parse_address), ('bd_addr', Address.parse_address),
('transmit_bandwidth', 4), ('transmit_bandwidth', 4),
('receive_bandwidth', 4), ('receive_bandwidth', 4),
('transmit_coding_format', 5), ('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', 5), ('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2), ('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2), ('receive_codec_frame_size', 2),
('input_bandwidth', 4), ('input_bandwidth', 4),
('output_bandwidth', 4), ('output_bandwidth', 4),
('input_coding_format', 5), ('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', 5), ('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2), ('input_coded_data_size', 2),
('output_coded_data_size', 2), ('output_coded_data_size', 2),
('input_pcm_data_format', 1), ('input_pcm_data_format', 1),
@@ -3829,8 +3865,10 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'advertising_event_properties', 'advertising_event_properties',
{ {
'size': 2, 'size': 2,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string( 'mapper': lambda x: str(
x HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
x
)
), ),
}, },
), ),
@@ -3840,8 +3878,8 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'primary_advertising_channel_map', 'primary_advertising_channel_map',
{ {
'size': 1, 'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string( 'mapper': lambda x: str(
x HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(x)
), ),
}, },
), ),
@@ -3863,38 +3901,33 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
''' '''
CONNECTABLE_ADVERTISING = 0 class AdvertisingProperties(enum.IntFlag):
SCANNABLE_ADVERTISING = 1 CONNECTABLE_ADVERTISING = 1 << 0
DIRECTED_ADVERTISING = 2 SCANNABLE_ADVERTISING = 1 << 1
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3 DIRECTED_ADVERTISING = 1 << 2
USE_LEGACY_ADVERTISING_PDUS = 4 HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
ANONYMOUS_ADVERTISING = 5 USE_LEGACY_ADVERTISING_PDUS = 1 << 4
INCLUDE_TX_POWER = 6 ANONYMOUS_ADVERTISING = 1 << 5
INCLUDE_TX_POWER = 1 << 6
ADVERTISING_PROPERTIES_NAMES = ( def __str__(self) -> str:
'CONNECTABLE_ADVERTISING', return '|'.join(
'SCANNABLE_ADVERTISING', flag.name
'DIRECTED_ADVERTISING', for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING', if self.value & flag.value and flag.name is not None
'USE_LEGACY_ADVERTISING_PDUS', )
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER',
)
CHANNEL_37 = 0 class ChannelMap(enum.IntFlag):
CHANNEL_38 = 1 CHANNEL_37 = 1 << 0
CHANNEL_39 = 2 CHANNEL_38 = 1 << 1
CHANNEL_39 = 1 << 2
CHANNEL_NAMES = ('37', '38', '39') def __str__(self) -> str:
return '|'.join(
@classmethod flag.name
def advertising_properties_string(cls, properties): for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
# pylint: disable=line-too-long if self.value & flag.value and flag.name is not None
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]' )
@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -3906,9 +3939,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'operation', 'operation',
{ {
'size': 1, 'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x x
), ).name,
}, },
), ),
('fragment_preference', 1), ('fragment_preference', 1),
@@ -3926,23 +3959,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
''' '''
INTERMEDIATE_FRAGMENT = 0x00 class Operation(enum.IntEnum):
FIRST_FRAGMENT = 0x01 INTERMEDIATE_FRAGMENT = 0x00
LAST_FRAGMENT = 0x02 FIRST_FRAGMENT = 0x01
COMPLETE_DATA = 0x03 LAST_FRAGMENT = 0x02
UNCHANGED_DATA = 0x04 COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -3954,9 +3976,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'operation', 'operation',
{ {
'size': 1, 'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x x
), ).name,
}, },
), ),
('fragment_preference', 1), ('fragment_preference', 1),
@@ -3974,22 +3996,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
''' '''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(
@@ -4481,7 +4487,10 @@ class HCI_LE_Accept_CIS_Request_Command(HCI_Command):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(
fields=[('connection_handle', 2)], fields=[
('connection_handle', 2),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
) )
class HCI_LE_Reject_CIS_Request_Command(HCI_Command): class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
''' '''
@@ -4489,6 +4498,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
''' '''
connection_handle: int connection_handle: int
reason: int
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -4497,9 +4507,9 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
('connection_handle', 2), ('connection_handle', 2),
('data_path_direction', 1), ('data_path_direction', 1),
('data_path_id', 1), ('data_path_id', 1),
('codec_id', 5), ('codec_id', CodingFormat.parse_from_bytes),
('controller_delay', 3), ('controller_delay', 3),
('codec_configuration', '*'), ('codec_configuration', 'v'),
], ],
return_parameters_fields=[ return_parameters_fields=[
('status', STATUS_SPEC), ('status', STATUS_SPEC),
@@ -4514,9 +4524,9 @@ class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command):
connection_handle: int connection_handle: int
data_path_direction: int data_path_direction: int
data_path_id: int data_path_id: int
codec_id: int codec_id: CodingFormat
controller_delay: int controller_delay: int
codec_configuration: int codec_configuration: bytes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -5326,6 +5336,10 @@ class HCI_Disconnection_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.5 Disconnection Complete Event See Bluetooth spec @ 7.7.5 Disconnection Complete Event
''' '''
status: int
connection_handle: int
reason: int
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)]) @HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)])
+67 -42
View File
@@ -15,30 +15,39 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Callable, MutableMapping
from typing import cast, Any
import logging import logging
from .colors import color from bumble import avdtp
from .att import ATT_CID, ATT_PDU from bumble.colors import color
from .smp import SMP_CID, SMP_Command from bumble.att import ATT_CID, ATT_PDU
from .core import name_or_number from bumble.smp import SMP_CID, SMP_Command
from .l2cap import ( from bumble.core import name_or_number
from bumble.l2cap import (
L2CAP_PDU, L2CAP_PDU,
L2CAP_CONNECTION_REQUEST, L2CAP_CONNECTION_REQUEST,
L2CAP_CONNECTION_RESPONSE, L2CAP_CONNECTION_RESPONSE,
L2CAP_SIGNALING_CID, L2CAP_SIGNALING_CID,
L2CAP_LE_SIGNALING_CID, L2CAP_LE_SIGNALING_CID,
L2CAP_Control_Frame, L2CAP_Control_Frame,
L2CAP_Connection_Request,
L2CAP_Connection_Response, L2CAP_Connection_Response,
) )
from .hci import ( from bumble.hci import (
HCI_EVENT_PACKET, HCI_EVENT_PACKET,
HCI_ACL_DATA_PACKET, HCI_ACL_DATA_PACKET,
HCI_DISCONNECTION_COMPLETE_EVENT, HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AclDataPacketAssembler, HCI_AclDataPacketAssembler,
HCI_Packet,
HCI_Event,
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
) )
from .rfcomm import RFCOMM_Frame, RFCOMM_PSM from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from .sdp import SDP_PDU, SDP_PSM from bumble.sdp import SDP_PDU, SDP_PSM
from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -50,23 +59,25 @@ logger = logging.getLogger(__name__)
PSM_NAMES = { PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM', RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP', SDP_PSM: 'SDP',
AVDTP_PSM: 'AVDTP' avdtp.AVDTP_PSM: 'AVDTP',
# TODO: add more PSM values
} }
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class PacketTracer: class PacketTracer:
class AclStream: class AclStream:
def __init__(self, analyzer): psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid self.psms = {} # PSM, by source_cid
self.peer = None # ACL stream in the other direction
# pylint: disable=too-many-nested-blocks # pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu): def on_acl_pdu(self, pdu: bytes) -> None:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu) l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
if l2cap_pdu.cid == ATT_CID: if l2cap_pdu.cid == ATT_CID:
@@ -81,26 +92,30 @@ class PacketTracer:
# Check if this signals a new channel # Check if this signals a new channel
if control_frame.code == L2CAP_CONNECTION_REQUEST: if control_frame.code == L2CAP_CONNECTION_REQUEST:
self.psms[control_frame.source_cid] = control_frame.psm connection_request = cast(L2CAP_Connection_Request, control_frame)
self.psms[connection_request.source_cid] = connection_request.psm
elif control_frame.code == L2CAP_CONNECTION_RESPONSE: elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
connection_response = cast(L2CAP_Connection_Response, control_frame)
if ( if (
control_frame.result connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
): ):
if self.peer: if self.peer:
if psm := self.peer.psms.get(control_frame.source_cid): if psm := self.peer.psms.get(
connection_response.source_cid
):
# Found a pending connection # Found a pending connection
self.psms[control_frame.destination_cid] = psm self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for # For AVDTP connections, create a packet assembler for
# each direction # each direction
if psm == AVDTP_PSM: if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[ self.avdtp_assemblers[
control_frame.source_cid connection_response.source_cid
] = AVDTP_MessageAssembler(self.on_avdtp_message) ] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[ self.peer.avdtp_assemblers[
control_frame.destination_cid connection_response.destination_cid
] = AVDTP_MessageAssembler( ] = avdtp.MessageAssembler(
self.peer.on_avdtp_message self.peer.on_avdtp_message
) )
@@ -113,7 +128,7 @@ class PacketTracer:
elif psm == RFCOMM_PSM: elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload) rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame) self.analyzer.emit(rfcomm_frame)
elif psm == AVDTP_PSM: elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit( self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, ' f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}' f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
@@ -130,22 +145,26 @@ class PacketTracer:
else: else:
self.analyzer.emit(l2cap_pdu) self.analyzer.emit(l2cap_pdu)
def on_avdtp_message(self, transaction_label, message): def on_avdtp_message(
self, transaction_label: int, message: avdtp.Message
) -> None:
self.analyzer.emit( self.analyzer.emit(
f'{color("AVDTP", "green")} [{transaction_label}] {message}' f'{color("AVDTP", "green")} [{transaction_label}] {message}'
) )
def feed_packet(self, packet): def feed_packet(self, packet: HCI_AclDataPacket) -> None:
self.packet_assembler.feed_packet(packet) self.packet_assembler.feed_packet(packet)
class Analyzer: class Analyzer:
def __init__(self, label, emit_message): acl_streams: MutableMapping[int, PacketTracer.AclStream]
peer: PacketTracer.Analyzer
def __init__(self, label: str, emit_message: Callable[..., None]) -> None:
self.label = label self.label = label
self.emit_message = emit_message self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle self.acl_streams = {} # ACL streams, by connection handle
self.peer = None # Analyzer in the other direction
def start_acl_stream(self, connection_handle): def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
logger.info( logger.info(
f'[{self.label}] +++ Creating ACL stream for connection ' f'[{self.label}] +++ Creating ACL stream for connection '
f'0x{connection_handle:04X}' f'0x{connection_handle:04X}'
@@ -160,7 +179,7 @@ class PacketTracer:
return stream return stream
def end_acl_stream(self, connection_handle): def end_acl_stream(self, connection_handle: int) -> None:
if connection_handle in self.acl_streams: if connection_handle in self.acl_streams:
logger.info( logger.info(
f'[{self.label}] --- Removing ACL stream for connection ' f'[{self.label}] --- Removing ACL stream for connection '
@@ -171,23 +190,29 @@ class PacketTracer:
# Let the other forwarder know so it can cleanup its stream as well # Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle) self.peer.end_acl_stream(connection_handle)
def on_packet(self, packet): def on_packet(self, packet: HCI_Packet) -> None:
self.emit(packet) self.emit(packet)
if packet.hci_packet_type == HCI_ACL_DATA_PACKET: if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
acl_packet = cast(HCI_AclDataPacket, packet)
# Look for an existing stream for this handle, create one if it is the # Look for an existing stream for this handle, create one if it is the
# first ACL packet for that connection handle # first ACL packet for that connection handle
if (stream := self.acl_streams.get(packet.connection_handle)) is None: if (
stream = self.start_acl_stream(packet.connection_handle) stream := self.acl_streams.get(acl_packet.connection_handle)
stream.feed_packet(packet) ) is None:
stream = self.start_acl_stream(acl_packet.connection_handle)
stream.feed_packet(acl_packet)
elif packet.hci_packet_type == HCI_EVENT_PACKET: elif packet.hci_packet_type == HCI_EVENT_PACKET:
if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT: event_packet = cast(HCI_Event, packet)
self.end_acl_stream(packet.connection_handle) if event_packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(
cast(HCI_Disconnection_Complete_Event, packet).connection_handle
)
def emit(self, message): def emit(self, message: Any) -> None:
self.emit_message(f'[{self.label}] {message}') self.emit_message(f'[{self.label}] {message}')
def trace(self, packet, direction=0): def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
if direction == 0: if direction == 0:
self.host_to_controller_analyzer.on_packet(packet) self.host_to_controller_analyzer.on_packet(packet)
else: else:
@@ -195,10 +220,10 @@ class PacketTracer:
def __init__( def __init__(
self, self,
host_to_controller_label=color('HOST->CONTROLLER', 'blue'), host_to_controller_label: str = color('HOST->CONTROLLER', 'blue'),
controller_to_host_label=color('CONTROLLER->HOST', 'cyan'), controller_to_host_label: str = color('CONTROLLER->HOST', 'cyan'),
emit_message=logger.info, emit_message: Callable[..., None] = logger.info,
): ) -> None:
self.host_to_controller_analyzer = PacketTracer.Analyzer( self.host_to_controller_analyzer = PacketTracer.Analyzer(
host_to_controller_label, emit_message host_to_controller_label, emit_message
) )
+37 -27
View File
@@ -22,7 +22,7 @@ import dataclasses
import enum import enum
import traceback import traceback
import warnings import warnings
from typing import Dict, List, Union, Set, TYPE_CHECKING from typing import Dict, List, Union, Set, Any, TYPE_CHECKING
from . import at from . import at
from . import rfcomm from . import rfcomm
@@ -35,7 +35,11 @@ from bumble.core import (
BT_L2CAP_PROTOCOL_ID, BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID,
) )
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command from bumble.hci import (
HCI_Enhanced_Setup_Synchronous_Connection_Command,
CodingFormat,
CodecID,
)
from bumble.sdp import ( from bumble.sdp import (
DataElement, DataElement,
ServiceAttribute, ServiceAttribute,
@@ -66,6 +70,7 @@ class HfpProtocolError(ProtocolError):
# Protocol Support # Protocol Support
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HfpProtocol: class HfpProtocol:
dlc: rfcomm.DLC dlc: rfcomm.DLC
@@ -842,19 +847,15 @@ class DefaultCodecParameters(enum.IntEnum):
@dataclasses.dataclass @dataclasses.dataclass
class EscoParameters: class EscoParameters:
# Codec specific # Codec specific
transmit_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat transmit_coding_format: CodingFormat
receive_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat receive_coding_format: CodingFormat
packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
max_latency: int max_latency: int
# Common # Common
input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = ( input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
)
output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
input_coded_data_size: int = 16 input_coded_data_size: int = 16
output_coded_data_size: int = 16 output_coded_data_size: int = 16
input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = ( input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
@@ -880,26 +881,31 @@ class EscoParameters:
transmit_codec_frame_size: int = 60 transmit_codec_frame_size: int = 60
receive_codec_frame_size: int = 60 receive_codec_frame_size: int = 60
def asdict(self) -> Dict[str, Any]:
# dataclasses.asdict() will recursively deep-copy the entire object,
# which is expensive and breaks CodingFormat object, so let it simply copy here.
return self.__dict__
_ESCO_PARAMETERS_CVSD_D0 = EscoParameters( _ESCO_PARAMETERS_CVSD_D0 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF, max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1, packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
) )
_ESCO_PARAMETERS_CVSD_D1 = EscoParameters( _ESCO_PARAMETERS_CVSD_D1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF, max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3, packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
) )
_ESCO_PARAMETERS_CVSD_S1 = EscoParameters( _ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007, max_latency=0x0007,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -912,8 +918,8 @@ _ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
) )
_ESCO_PARAMETERS_CVSD_S2 = EscoParameters( _ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007, max_latency=0x0007,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -925,8 +931,8 @@ _ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
) )
_ESCO_PARAMETERS_CVSD_S3 = EscoParameters( _ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000A, max_latency=0x000A,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -938,8 +944,8 @@ _ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
) )
_ESCO_PARAMETERS_CVSD_S4 = EscoParameters( _ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000C, max_latency=0x000C,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -951,8 +957,8 @@ _ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
) )
_ESCO_PARAMETERS_MSBC_T1 = EscoParameters( _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x0008, max_latency=0x0008,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -960,12 +966,14 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
), ),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
) )
_ESCO_PARAMETERS_MSBC_T2 = EscoParameters( _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x000D, max_latency=0x000D,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -974,10 +982,12 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
), ),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
) )
ESCO_PERAMETERS = { ESCO_PARAMETERS = {
DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0, DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1, DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1, DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,
+27 -3
View File
@@ -32,8 +32,8 @@ from .hci import (
Address, Address,
HCI_ACL_DATA_PACKET, HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET, HCI_COMMAND_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_EVENT_PACKET, HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND, HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
@@ -52,6 +52,7 @@ from .hci import (
HCI_Constant, HCI_Constant,
HCI_Error, HCI_Error,
HCI_Event, HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command, HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command, HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command, HCI_LE_Read_Buffer_Size_Command,
@@ -75,7 +76,6 @@ from .core import (
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
ConnectionPHY, ConnectionPHY,
ConnectionParameters, ConnectionParameters,
InvalidStateError,
) )
from .utils import AbortableEventEmitter from .utils import AbortableEventEmitter
from .transport.common import TransportLostError from .transport.common import TransportLostError
@@ -243,7 +243,7 @@ class Host(AbortableEventEmitter):
# understand # understand
le_event_mask = bytes.fromhex('1F00000000000000') le_event_mask = bytes.fromhex('1F00000000000000')
else: else:
le_event_mask = bytes.fromhex('FFFFF00000000000') le_event_mask = bytes.fromhex('FFFFFFFF00000000')
await self.send_command( await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask) HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -495,6 +495,8 @@ class Host(AbortableEventEmitter):
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet)) self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET: elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet)) self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
else: else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
@@ -515,6 +517,10 @@ class Host(AbortableEventEmitter):
# Experimental # Experimental
self.emit('sco_packet', packet.connection_handle, packet) self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
self.emit('l2cap_pdu', connection.handle, cid, pdu) self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -715,6 +721,24 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event): def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event) self.on_hci_le_advertising_report_event(event)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
event.acl_connection_handle,
event.cis_connection_handle,
event.cig_id,
event.cis_id,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
)
def on_hci_le_remote_connection_parameter_request_event(self, event): def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections: if event.connection_handle not in self.connections:
logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle') logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')
+8
View File
@@ -391,6 +391,9 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
''' '''
psm: int
source_cid: int
@staticmethod @staticmethod
def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]: def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
psm_length = 2 psm_length = 2
@@ -432,6 +435,11 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
''' '''
source_cid: int
destination_cid: int
status: int
result: int
CONNECTION_SUCCESSFUL = 0x0000 CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001 CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002 CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
+147
View File
@@ -0,0 +1,147 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import Optional
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
ENCRYPTED = 0x00
PLAINTEXT = 0x01
class MemberLock(enum.IntEnum):
'''Coordinated Set Identification Service - 5.3 Set Member Lock.'''
UNLOCKED = 0x01
LOCKED = 0x02
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
# TODO: Implement RSI Generator
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationService(gatt.TemplateService):
UUID = gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE
set_identity_resolving_key_characteristic: gatt.Characteristic
coordinated_set_size_characteristic: Optional[gatt.Characteristic] = None
set_member_lock_characteristic: Optional[gatt.Characteristic] = None
set_member_rank_characteristic: Optional[gatt.Characteristic] = None
def __init__(
self,
set_identity_resolving_key: bytes,
coordinated_set_size: Optional[int] = None,
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
characteristics = []
self.set_identity_resolving_key_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
# TODO: Implement encrypted SIRK reader.
value=struct.pack('B', SirkType.PLAINTEXT) + set_identity_resolving_key,
)
characteristics.append(self.set_identity_resolving_key_characteristic)
if coordinated_set_size is not None:
self.coordinated_set_size_characteristic = gatt.Characteristic(
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
if set_member_lock is not None:
self.set_member_lock_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
characteristics.append(self.set_member_lock_characteristic)
if set_member_rank is not None:
self.set_member_rank_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = CoordinatedSetIdentificationService
set_identity_resolving_key: gatt_client.CharacteristicProxy
coordinated_set_size: Optional[gatt_client.CharacteristicProxy] = None
set_member_lock: Optional[gatt_client.CharacteristicProxy] = None
set_member_rank: Optional[gatt_client.CharacteristicProxy] = None
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.set_identity_resolving_key = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC
):
self.coordinated_set_size = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC
):
self.set_member_lock = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]
+66 -43
View File
@@ -187,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000 SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt # Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032') SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# fmt: on # fmt: on
# pylint: enable=line-too-long # pylint: enable=line-too-long
@@ -579,7 +579,7 @@ class OobContext:
self.r = crypto.r() if r is None else r self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData: def share(self) -> OobSharedData:
pkx = bytes(reversed(self.ecc_key.x)) pkx = self.ecc_key.x[::-1]
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
@@ -677,6 +677,13 @@ class Session:
}, },
} }
ea: bytes
eb: bytes
ltk: bytes
preq: bytes
pres: bytes
tk: bytes
def __init__( def __init__(
self, self,
manager: Manager, manager: Manager,
@@ -686,15 +693,10 @@ class Session:
) -> None: ) -> None:
self.manager = manager self.manager = manager
self.connection = connection self.connection = connection
self.preq: Optional[bytes] = None
self.pres: Optional[bytes] = None
self.ea = None
self.eb = None
self.stk = None self.stk = None
self.ltk = None
self.ltk_ediv = 0 self.ltk_ediv = 0
self.ltk_rand = bytes(8) self.ltk_rand = bytes(8)
self.link_key = None self.link_key: Optional[bytes] = None
self.initiator_key_distribution: int = 0 self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0 self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None self.peer_random_value: Optional[bytes] = None
@@ -787,9 +789,7 @@ class Session:
) )
self.r = pairing_config.oob.our_context.r self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is None: if pairing_config.oob.legacy_context is not None:
self.tk = None
else:
self.tk = pairing_config.oob.legacy_context.tk self.tk = pairing_config.oob.legacy_context.tk
else: else:
if pairing_config.oob.legacy_context is None: if pairing_config.oob.legacy_context is None:
@@ -807,7 +807,7 @@ class Session:
@property @property
def pkx(self) -> Tuple[bytes, bytes]: def pkx(self) -> Tuple[bytes, bytes]:
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x) return (self.ecc_key.x[::-1], self.peer_public_key_x)
@property @property
def pka(self) -> bytes: def pka(self) -> bytes:
@@ -1061,8 +1061,8 @@ class Session:
def send_public_key_command(self) -> None: def send_public_key_command(self) -> None:
self.send_command( self.send_command(
SMP_Pairing_Public_Key_Command( SMP_Pairing_Public_Key_Command(
public_key_x=bytes(reversed(self.ecc_key.x)), public_key_x=self.ecc_key.x[::-1],
public_key_y=bytes(reversed(self.ecc_key.y)), public_key_y=self.ecc_key.y[::-1],
) )
) )
@@ -1098,15 +1098,52 @@ class Session:
) )
) )
async def derive_ltk(self) -> None: @classmethod
link_key = await self.manager.device.get_link_key(self.connection.peer_address) def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
assert link_key is not None '''Derives Long Term Key from Link Key.
Args:
link_key: BR/EDR Link Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
LE Long Tern Key bytes in little-endian.
'''
ilk = ( ilk = (
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
if self.ct2 if ct2
else crypto.h6(link_key, b'tmp2') else crypto.h6(link_key, b'tmp2')
) )
self.ltk = crypto.h6(ilk, b'brle') return crypto.h6(ilk, b'brle')
@classmethod
def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
'''Derives Link Key from Long Term Key.
Args:
ltk: LE Long Term Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
BR/EDR Link Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
if ct2
else crypto.h6(ltk, b'tmp1')
)
return crypto.h6(ilk, b'lebr')
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
self.send_pairing_failed(
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
def distribute_keys(self) -> None: def distribute_keys(self) -> None:
# Distribute the keys as required # Distribute the keys as required
@@ -1117,7 +1154,7 @@ class Session:
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
): ):
self.ctkd_task = self.connection.abort_on( self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk() 'disconnection', self.get_link_key_and_derive_ltk()
) )
elif not self.sc: elif not self.sc:
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
@@ -1147,12 +1184,7 @@ class Session:
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = ( self.link_key = self.derive_link_key(self.ltk, self.ct2)
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
else: else:
# CTKD: Derive LTK from LinkKey # CTKD: Derive LTK from LinkKey
@@ -1161,7 +1193,7 @@ class Session:
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
): ):
self.ctkd_task = self.connection.abort_on( self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk() 'disconnection', self.get_link_key_and_derive_ltk()
) )
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
elif not self.sc: elif not self.sc:
@@ -1191,12 +1223,7 @@ class Session:
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = ( self.link_key = self.derive_link_key(self.ltk, self.ct2)
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase # Set our expectations for what to wait for in the key distribution phase
@@ -1754,14 +1781,10 @@ class Session:
self.peer_public_key_y = command.public_key_y self.peer_public_key_y = command.public_key_y
# Compute the DH key # Compute the DH key
self.dh_key = bytes( self.dh_key = self.ecc_key.dh(
reversed( command.public_key_x[::-1],
self.ecc_key.dh( command.public_key_y[::-1],
bytes(reversed(command.public_key_x)), )[::-1]
bytes(reversed(command.public_key_y)),
)
)
)
logger.debug(f'DH key: {self.dh_key.hex()}') logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB: if self.pairing_method == PairingMethod.OOB:
@@ -1824,7 +1847,6 @@ class Session:
else: else:
self.send_pairing_dhkey_check_command() self.send_pairing_dhkey_check_command()
else: else:
assert self.ltk
self.start_encryption(self.ltk) self.start_encryption(self.ltk)
def on_smp_pairing_failed_command( def on_smp_pairing_failed_command(
@@ -1874,6 +1896,7 @@ class Manager(EventEmitter):
sessions: Dict[int, Session] sessions: Dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig] pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session] session_proxy: Type[Session]
_ecc_key: Optional[crypto.EccKey]
def __init__( def __init__(
self, self,
+1 -1
View File
@@ -150,7 +150,7 @@ class PacketParser:
try: try:
self.sink.on_packet(bytes(self.packet)) self.sink.on_packet(bytes(self.packet))
except Exception as error: except Exception as error:
logger.warning( logger.exception(
color(f'!!! Exception in on_packet: {error}', 'red') color(f'!!! Exception in on_packet: {error}', 'red')
) )
self.reset() self.reset()
+58 -61
View File
@@ -24,9 +24,10 @@ import platform
import usb1 import usb1
from .common import Transport, ParserSource from bumble.transport.common import Transport, ParserSource
from .. import hci from bumble import hci
from ..colors import color from bumble.colors import color
from bumble.utils import AsyncRunner
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -113,7 +114,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out): def __init__(self, device, acl_out):
self.device = device self.device = device
self.acl_out = acl_out self.acl_out = acl_out
self.transfer = device.getTransfer() self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future() self.cancel_done = self.loop.create_future()
@@ -137,21 +138,20 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump # The queue was previously empty, re-prime the pump
self.process_queue() self.process_queue()
def on_packet_sent(self, transfer): def transfer_callback(self, transfer):
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_) self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else: else:
logger.warning( logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red') color(f'!!! OUT transfer not completed: status={status}', 'red')
) )
def on_packet_sent_(self): def on_packet_sent(self):
if self.packets: if self.packets:
self.packets.popleft() self.packets.popleft()
self.process_queue() self.process_queue()
@@ -163,22 +163,20 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0] packet = self.packets[0]
packet_type = packet[0] packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET: if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk( self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent self.acl_out, packet[1:], callback=self.transfer_callback
) )
logger.debug('submit ACL') self.acl_out_transfer.submit()
self.transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET: elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl( self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0, 0,
0, 0,
0, 0,
packet[1:], packet[1:],
callback=self.on_packet_sent, callback=self.transfer_callback,
) )
logger.debug('submit COMMAND') self.acl_out_transfer.submit()
self.transfer.submit()
else: else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red')) logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -193,11 +191,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear() self.packets.clear()
# If we have a transfer in flight, cancel it # If we have a transfer in flight, cancel it
if self.transfer.isSubmitted(): if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have # Try to cancel the transfer, but that may fail because it may have
# already completed # already completed
try: try:
self.transfer.cancel() self.acl_out_transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...') logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done await self.cancel_done
@@ -206,27 +204,22 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed') logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource): class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in): def __init__(self, device, metadata, acl_in, events_in):
super().__init__() super().__init__()
self.context = context
self.device = device self.device = device
self.metadata = metadata self.metadata = metadata
self.acl_in = acl_in self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.dequeue_task = None self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = { self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(), hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
} }
self.events_in_transfer = None self.closed = False
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
def start(self): def start(self):
# Set up transfer objects for input # Set up transfer objects for input
@@ -234,7 +227,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt( self.events_in_transfer.setInterrupt(
self.events_in, self.events_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET, user_data=hci.HCI_EVENT_PACKET,
) )
self.events_in_transfer.submit() self.events_in_transfer.submit()
@@ -243,22 +236,23 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk( self.acl_in_transfer.setBulk(
self.acl_in, self.acl_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET, user_data=hci.HCI_ACL_DATA_PACKET,
) )
self.acl_in_transfer.submit() self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue()) self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
def on_packet_received(self, transfer): @property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
packet_type = transfer.getUserData() packet_type = transfer.getUserData()
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
@@ -267,18 +261,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()] + transfer.getBuffer()[: transfer.getActualLength()]
) )
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet) self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe( self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None self.cancel_done[packet_type].set_result, None
) )
return
else: else:
logger.warning( logger.warning(
color(f'!!! transfer not completed: status={status}', 'red') color(f'!!! IN transfer not completed: status={status}', 'red')
) )
self.loop.call_soon_threadsafe(self.on_transport_lost)
# Re-submit the transfer so we can receive more data
transfer.submit()
async def dequeue(self): async def dequeue(self):
while not self.closed: while not self.closed:
@@ -288,21 +282,6 @@ async def open_usb_transport(spec: str) -> Transport:
return return
self.parser.feed_data(packet) self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self): def close(self):
self.closed = True self.closed = True
@@ -331,15 +310,14 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed' f'IN[{packet_type}] transfer likely already completed'
) )
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport): class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink): def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink) super().__init__(source, sink)
self.context = context self.context = context
self.device = device self.device = device
self.interface = interface self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access # Get exclusive access
device.claimInterface(interface) device.claimInterface(interface)
@@ -352,6 +330,22 @@ async def open_usb_transport(spec: str) -> Transport:
source.start() source.start()
sink.start() sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self): async def close(self):
self.source.close() self.source.close()
self.sink.close() self.sink.close()
@@ -361,6 +355,9 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close() self.device.close()
self.context.close() self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker # Find the device according to the spec moniker
load_libusb() load_libusb()
context = usb1.USBContext() context = usb1.USBContext()
@@ -540,7 +537,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError: except usb1.USBError:
logger.warning('failed to set configuration') logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in) source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out) sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink) return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error: except usb1.USBError as error:
+17 -1
View File
@@ -432,7 +432,7 @@ def wrap_async(function):
def deprecated(msg: str): def deprecated(msg: str):
""" """
Throw deprecation warning before execution Throw deprecation warning before execution.
""" """
def wrapper(function): def wrapper(function):
@@ -444,3 +444,19 @@ def deprecated(msg: str):
return inner return inner
return wrapper return wrapper
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
return inner
return wrapper
+53 -13
View File
@@ -1,19 +1,19 @@
ANDROID REMOTE HCI APP ANDROID REMOTE HCI APP
====================== ======================
This application allows using an android phone's built-in Bluetooth controller with This application allows using an android phone's built-in Bluetooth controller with
a Bumble host stack running outside the phone (typically a development laptop or desktop). a Bumble host stack running outside the phone (typically a development laptop or desktop).
The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
description of the Android Bluetooth HCI HAL). description of the Android Bluetooth HCI HAL).
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
packets coming from the controller are forwarded to the TCP socket. packets coming from the controller are forwarded to the TCP socket.
Building Building
-------- --------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `RemoteHCI` top level directory. You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `extras/android/RemoteHCI` top level directory.
You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there. You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at: If the build succeeds, you can find the app APKs (debug and release) at:
@@ -25,9 +25,23 @@ If the build succeeds, you can find the app APKs (debug and release) at:
Running Running
------- -------
!!! note
In the following examples, it is assumed that shell commands are executed while in the
app's root directory, `extras/android/RemoteHCI`. If you are in a different directory,
adjust the relative paths accordingly.
### Preconditions ### Preconditions
When the proxy starts (tapping the "Start" button in the app's main activity), it will try to When the proxy starts (tapping the "Start" button in the app's main activity, or running the proxy
bind to the Bluetooth HAL. This requires disabling SELinux temporarily, and being the only HAL client. from an `adb shell` command line), it will try to bind to the Bluetooth HAL.
This requires that there is no other HAL client, and requires certain privileges.
For running as a regular app, this requires disabling SELinux temporarily.
For running as a command-line executable, this just requires a root shell.
#### Root Shell
!!! tip "Restart `adb` as root"
```bash
$ adb root
```
#### Disabling SELinux #### Disabling SELinux
Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed
@@ -56,8 +70,8 @@ development phone).
This state will also reset to the normal SELinux enforcement when you reboot. This state will also reset to the normal SELinux enforcement when you reboot.
#### Stopping the bluetooth process #### Stopping the bluetooth process
Since the Bluetooth HAL service can only accept one client, and that in normal conditions Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the that client is the Android's bluetooth stack, it is required to first shut down the
Android bluetooth stack process. Android bluetooth stack process.
!!! tip "Checking if the Bluetooth process is running" !!! tip "Checking if the Bluetooth process is running"
@@ -79,7 +93,33 @@ Airplane Mode, then rebooting. The bluetooth process should, in theory, not rest
$ adb shell cmd bluetooth_manager disable $ adb shell cmd bluetooth_manager disable
``` ```
### Starting the app ### Running as a command line app
You push the built APK to a temporary location on the phone's filesystem, then launch the command
line executable with an `adb shell` command.
!!! tip "Pushing the executable"
```bash
$ adb push app/build/outputs/apk/release/app-release-unsigned.apk /data/local/tmp/remotehci.apk
```
Do this every time you rebuild. Alternatively, you can push the `debug` APK instead:
```bash
$ adb push app/build/outputs/apk/debug/app-debug.apk /data/local/tmp/remotehci.apk
```
!!! tip "Start the proxy from the command line"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface"
```
This will run the proxy, listening on the default TCP port.
If you want a different port, pass it as a command line parameter
!!! tip "Start the proxy from the command line with a specific TCP port"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface 12345"
```
### Running as a normal app
You can start the app from the Android launcher, from Android Studio, or with `adb` You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher #### Launching from the launcher
@@ -103,11 +143,11 @@ automatically start the proxy, and/or set the port number.
#### Selecting a TCP port #### Selecting a TCP port
The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on
which the proxy is accepting connections. If the default value isn't suitable, you can which the proxy is accepting connections. If the default value isn't suitable, you can
change it there (you can also use the special value 0 to let the OS assign a port number for you). change it there (you can also use the special value 0 to let the OS assign a port number for you).
### Connecting to the proxy ### Connecting to the proxy
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb` stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb`
TCP forward (which should be faster than over WiFi). TCP forward (which should be faster than over WiFi).
@@ -116,7 +156,7 @@ TCP forward (which should be faster than over WiFi).
```bash ```bash
$ adb forward tcp:<outside-port> tcp:<inside-port> $ adb forward tcp:<outside-port> tcp:<inside-port>
``` ```
Where ``<outside-port>`` is the port number for a listening socket on your laptop or Where ``<outside-port>`` is the port number for a listening socket on your laptop or
desktop machine, and <inside-port> is the TCP port selected in the app's user interface. desktop machine, and <inside-port> is the TCP port selected in the app's user interface.
Those two ports may be the same, of course. Those two ports may be the same, of course.
For example, with the default TCP port 9993: For example, with the default TCP port 9993:
@@ -125,7 +165,7 @@ TCP forward (which should be faster than over WiFi).
``` ```
Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or
via an `adb` forward, you can then use it as a Bumble transport, using the transport name: via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
``tcp-client:<host>:<port>`` syntax. ``tcp-client:<host>:<port>`` syntax.
!!! example "Connecting a Bumble client" !!! example "Connecting a Bumble client"
+5
View File
@@ -0,0 +1,5 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"advertising_interval": 100
}
+107
View File
@@ -0,0 +1,107 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
Device,
Connection,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].cis_enabled = True
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 0),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 0),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
for cis_link in cis_links:
await cis_link.disconnect()
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+87
View File
@@ -0,0 +1,87 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import logging
import sys
import os
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_esco_connection.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_esco_connection.py classic1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
connections = await asyncio.gather(
devices[0].accept(devices[1].public_address),
devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
)
def on_sco(sco_link: ScoLink):
connections[0].abort_on('disconnection', sco_link.disconnect())
devices[0].once('sco_connection', on_sco)
await devices[0].send_command(
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
# type: ignore[call-args]
)
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+69
View File
@@ -0,0 +1,69 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
)
print('example: run_extended_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
else:
target = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
@@ -1,5 +1,5 @@
[versions] [versions]
agp = "8.3.0-alpha11" agp = "8.2.0"
kotlin = "1.9.0" kotlin = "1.9.0"
core-ktx = "1.12.0" core-ktx = "1.12.0"
junit = "4.13.2" junit = "4.13.2"
@@ -0,0 +1,57 @@
package com.github.google.bumble.remotehci
import java.io.IOException
class CommandLineInterface {
companion object {
fun printUsage() {
System.out.println("usage: <launch-command> [-h|--help] [<tcp-port>]")
}
@JvmStatic fun main(args: Array<String>) {
System.out.println("Starting proxy")
var tcpPort = DEFAULT_TCP_PORT
if (args.isNotEmpty()) {
if (args[0] == "-h" || args[0] == "--help") {
printUsage()
return
}
try {
tcpPort = args[0].toInt()
} catch (error: NumberFormatException) {
System.out.println("ERROR: invalid TCP port argument")
printUsage()
return
}
}
try {
val hciProxy = HciProxy(tcpPort, object : HciProxy.Listener {
override fun onHostConnectionState(connected: Boolean) {
}
override fun onHciPacketCountChange(
commandPacketsReceived: Int,
aclPacketsReceived: Int,
scoPacketsReceived: Int,
eventPacketsSent: Int,
aclPacketsSent: Int,
scoPacketsSent: Int
) {
}
override fun onMessage(message: String?) {
System.out.println(message)
}
})
hciProxy.run()
} catch (error: IOException) {
System.err.println("Exception while running HCI Server: $error")
} catch (error: HciProxy.HalException) {
System.err.println("HAL exception: ${error.message}")
}
}
}
}
@@ -1,5 +1,5 @@
[versions] [versions]
agp = "8.3.0-alpha11" agp = "8.2.0"
kotlin = "1.8.10" kotlin = "1.8.10"
core-ktx = "1.9.0" core-ktx = "1.9.0"
junit = "4.13.2" junit = "4.13.2"
+2 -2
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC # Copyright 2021-2023 Google LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ url = https://github.com/google/bumble
[options] [options]
python_requires = >=3.8 python_requires = >=3.8
packages = bumble, bumble.transport, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools packages = bumble, bumble.transport, bumble.transport.grpc_protobuf, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
package_dir = package_dir =
bumble = bumble bumble = bumble
bumble.apps = apps bumble.apps = apps
+74
View File
@@ -0,0 +1,74 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import struct
import logging
from bumble import device
from bumble.profiles import csip
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices()
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy
)
assert (
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED
)
assert await csis_client.set_member_rank.read_value() == struct.pack('B', 0)
# -----------------------------------------------------------------------------
async def run():
await test_csis()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+79
View File
@@ -20,6 +20,7 @@ import logging
import os import os
import struct import struct
import pytest import pytest
from unittest.mock import Mock, ANY
from bumble.controller import Controller from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy from bumble.gatt_client import CharacteristicProxy
@@ -763,6 +764,83 @@ async def test_subscribe_notify():
assert not c3._called_3 assert not c3._called_3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_unsubscribe():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
characteristic2 = Characteristic(
'3234C4F4-3F34-4616-8935-45A50EE05DEB',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic1, characteristic2],
)
server.add_services([service1])
mock1 = Mock()
characteristic1.on('subscription', mock1)
mock2 = Mock()
characteristic2.on('subscription', mock2)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
await c1.subscribe()
await async_barrier()
mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe()
await async_barrier()
mock2.assert_called_once_with(ANY, True, False)
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()
mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock()
await c2.unsubscribe()
await async_barrier()
mock2.assert_called_once_with(ANY, False, False)
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()
mock1.assert_not_called()
mock2.reset_mock()
await c2.unsubscribe()
await async_barrier()
mock2.assert_not_called()
mock1.reset_mock()
await c1.unsubscribe(force=True)
await async_barrier()
mock1.assert_called_once_with(ANY, False, False)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_mtu_exchange(): async def test_mtu_exchange():
@@ -886,6 +964,7 @@ async def async_main():
await test_read_write() await test_read_write()
await test_read_write2() await test_read_write2()
await test_subscribe_notify() await test_subscribe_notify()
await test_unsubscribe()
await test_characteristic_encoding() await test_characteristic_encoding()
await test_mtu_exchange() await test_mtu_exchange()
+16
View File
@@ -24,6 +24,8 @@ from bumble.hci import (
HCI_RESET_COMMAND, HCI_RESET_COMMAND,
HCI_SUCCESS, HCI_SUCCESS,
Address, Address,
CodingFormat,
CodecID,
HCI_Command, HCI_Command,
HCI_Command_Complete_Event, HCI_Command_Complete_Event,
HCI_Command_Status_Event, HCI_Command_Status_Event,
@@ -442,6 +444,20 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
basic_check(command) basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Setup_ISO_Data_Path_Command():
command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000'))
assert command.connection_handle == 0x0060
assert command.data_path_direction == 0x00
assert command.data_path_id == 0x01
assert command.codec_id == CodingFormat(CodecID.TRANSPARENT)
assert command.controller_delay == 0
assert command.codec_configuration == b''
basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_address(): def test_address():
a = Address('C4:F2:17:1A:1D:BB') a = Address('C4:F2:17:1A:1D:BB')
+3 -12
View File
@@ -21,7 +21,7 @@ import logging
import os import os
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
@@ -38,7 +38,6 @@ from bumble.smp import (
OobLegacyContext, OobLegacyContext,
) )
from bumble.core import ProtocolError from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
from bumble.keys import PairingKeys from bumble.keys import PairingKeys
@@ -519,16 +518,8 @@ async def test_self_smp_over_classic():
# Mock connection # Mock connection
# TODO: Implement Classic SSP and encryption in link relayer # TODO: Implement Classic SSP and encryption in link relayer
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835') LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
two_devices.devices[0].on_link_key( two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY)
two_devices.devices[1].public_address, two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY)
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[1].on_link_key(
two_devices.devices[0].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.connections[0].encryption = 1 two_devices.connections[0].encryption = 1
two_devices.connections[1].encryption = 1 two_devices.connections[1].encryption = 1
+68 -72
View File
@@ -16,6 +16,9 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import pytest
from bumble import smp
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1 from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.pairing import OobData, OobSharedData, LeRole from bumble.pairing import OobData, OobSharedData, LeRole
from bumble.hci import Address from bumble.hci import Address
@@ -28,8 +31,8 @@ from bumble.core import AdvertisingData
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def reversed_hex(hex_str): def reversed_hex(hex_str: str) -> bytes:
return bytes(reversed(bytes.fromhex(hex_str))) return bytes.fromhex(hex_str)[::-1]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -129,112 +132,79 @@ def test_aes_cmac():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_f4(): def test_f4():
u = bytes( u = reversed_hex(
reversed( '20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
) )
v = bytes( v = reversed_hex(
reversed( '55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
) )
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
z = bytes([0]) z = b'\0'
value = f4(u, v, x, z) value = f4(u, v, x, z)
assert bytes(reversed(value)) == bytes.fromhex( assert value == reversed_hex('f2c916f1 07a9bd1c f1eda1be a974872d')
'f2c916f1 07a9bd1c f1eda1be a974872d'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_f5(): def test_f5():
w = bytes( w = reversed_hex(
reversed( 'ec0234a3 57c8ad05 341010a6 0a397d9b 99796b13 b4f866f1 868d34f3 73bfa698'
bytes.fromhex(
'ec0234a3 57c8ad05 341010a6 0a397d9b'
+ '99796b13 b4f866f1 868d34f3 73bfa698'
)
)
) )
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce'))) a1 = reversed_hex('00561237 37bfce')
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1'))) a2 = reversed_hex('00a71370 2dcfc1')
value = f5(w, n1, n2, a1, a2) value = f5(w, n1, n2, a1, a2)
assert bytes(reversed(value[0])) == bytes.fromhex( assert value[0] == reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
'2965f176 a1084a02 fd3f6a20 ce636e20' assert value[1] == reversed_hex('69867911 69d7cd23 980522b5 94750a38')
)
assert bytes(reversed(value[1])) == bytes.fromhex(
'69867911 69d7cd23 980522b5 94750a38'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_f6(): def test_f6():
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
mac_key = bytes(reversed(bytes.fromhex('2965f176 a1084a02 fd3f6a20 ce636e20'))) mac_key = reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
r = bytes(reversed(bytes.fromhex('12a3343b b453bb54 08da42d2 0c2d0fc8'))) r = reversed_hex('12a3343b b453bb54 08da42d2 0c2d0fc8')
io_cap = bytes(reversed(bytes.fromhex('010102'))) io_cap = reversed_hex('010102')
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce'))) a1 = reversed_hex('00561237 37bfce')
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1'))) a2 = reversed_hex('00a71370 2dcfc1')
value = f6(mac_key, n1, n2, r, io_cap, a1, a2) value = f6(mac_key, n1, n2, r, io_cap, a1, a2)
assert bytes(reversed(value)) == bytes.fromhex( assert value == reversed_hex('e3c47398 9cd0e8c5 d26c0b09 da958f61')
'e3c47398 9cd0e8c5 d26c0b09 da958f61'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_g2(): def test_g2():
u = bytes( u = reversed_hex(
reversed( '20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
) )
v = bytes( v = reversed_hex(
reversed( '55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
) )
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
y = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) y = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
value = g2(u, v, x, y) value = g2(u, v, x, y)
assert value == 0x2F9ED5BA assert value == 0x2F9ED5BA
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_h6(): def test_h6():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY_ID = bytes.fromhex('6c656272') KEY_ID = bytes.fromhex('6c656272')
assert h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399') assert h6(KEY, KEY_ID) == reversed_hex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_h7(): def test_h7():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031') SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
assert h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011') assert h7(SALT, KEY) == reversed_hex('fb173597 c6a3c0ec d2998c2a 75a57011')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_ah(): def test_ah():
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b'))) irk = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
prand = bytes(reversed(bytes.fromhex('708194'))) prand = reversed_hex('708194')
value = ah(irk, prand) value = ah(irk, prand)
expected = bytes(reversed(bytes.fromhex('0dfbaa'))) expected = reversed_hex('0dfbaa')
assert value == expected assert value == expected
@@ -243,7 +213,7 @@ def test_oob_data():
oob_data = OobData( oob_data = OobData(
address=Address("F0:F1:F2:F3:F4:F5"), address=Address("F0:F1:F2:F3:F4:F5"),
role=LeRole.BOTH_PERIPHERAL_PREFERRED, role=LeRole.BOTH_PERIPHERAL_PREFERRED,
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])), shared_data=OobSharedData(c=b'12', r=b'34'),
) )
oob_data_ad = oob_data.to_ad() oob_data_ad = oob_data.to_ad()
oob_data_bytes = bytes(oob_data_ad) oob_data_bytes = bytes(oob_data_ad)
@@ -255,6 +225,32 @@ def test_oob_data():
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'bc1ca4ef 633fc1bd 0d8230af ee388fb0'),
(True, '287ad379 dca40253 0a39f1f4 3047b835'),
],
)
def test_ltk_to_link_key(ct2: bool, expected: str):
LTK = reversed_hex('368df9bc e3264b58 bd066c33 334fbf64')
assert smp.Session.derive_link_key(LTK, ct2) == reversed_hex(expected)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'a813fb72 f1a3dfa1 8a2c9a43 f10d0a30'),
(True, 'e85e09eb 5eccb3e2 69418a13 3211bc79'),
],
)
def test_link_key_to_ltk(ct2: bool, expected: str):
LINK_KEY = reversed_hex('05040302 01000908 07060504 03020100')
assert smp.Session.derive_ltk(LINK_KEY, ct2) == reversed_hex(expected)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_ecc() test_ecc()
+3
View File
@@ -71,3 +71,6 @@ class TwoDevices:
# Check the post conditions # Check the post conditions
assert self.connections[0] is not None assert self.connections[0] is not None
assert self.connections[1] is not None assert self.connections[1] is not None
def __getitem__(self, index: int) -> Device:
return self.devices[index]