Compare commits

...

38 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 8400ff0802 shared usage printer 2023-12-04 00:37:28 -08:00
Gilles Boccon-Gibod 0ed6aa230b address PR comment 2023-12-04 00:32:04 -08:00
Gilles Boccon-Gibod 72d5360af9 keep projects compatible with Android Studio Hedgehog 2023-12-03 18:06:54 -08:00
Gilles Boccon-Gibod ac3961e763 add doc 2023-12-03 17:50:42 -08:00
Gilles Boccon-Gibod 8385035400 add CLI support 2023-12-03 16:35:14 -08:00
zxzxwu 247cb89332 Merge pull request #358 from zxzxwu/coding2
Add variable-length bytes field
2023-12-01 03:26:38 +08:00
Josh Wu 3fc71a0266 Add variable-length bytes field 2023-12-01 03:16:52 +08:00
zxzxwu 392dcc3a05 Merge pull request #357 from zxzxwu/coding
Refactor CodingFormat
2023-12-01 03:15:33 +08:00
Josh Wu f27015d1b7 Refactor CodingFormat
As CodingFormat is now used by HFP and LEA, and vendor specific codecs
are introduced, this object needs to provide more information.
2023-12-01 02:58:09 +08:00
zxzxwu 86a19b41aa Merge pull request #344 from zxzxwu/cis
CIS and SCO responder support
2023-11-30 21:00:55 +08:00
Gilles Boccon-Gibod 320164d476 Merge pull request #355 from google/gbg/fix-gatt-unsubscribe
fix #354 (gatt unsubscribe)
2023-11-29 22:28:57 -08:00
Josh Wu 40ae661ee5 More SCO support and warnings and typo fix 2023-11-30 12:59:43 +08:00
Josh Wu c5def93bb8 CIS and SCO responder support 2023-11-30 12:16:40 +08:00
zxzxwu a9c4c5833d Merge pull request #350 from zxzxwu/csip
Add Coordinated Set Identification Service(CSIS)
2023-11-30 12:15:56 +08:00
Gilles Boccon-Gibod 58c9c4f590 fix #354 2023-11-29 19:19:40 -08:00
zxzxwu 24524d88cb Merge pull request #342 from zxzxwu/typing
Typing helper
2023-11-30 00:21:44 +08:00
zxzxwu b8849ab311 Merge pull request #349 from zxzxwu/stack
Log track back in on_packet
2023-11-30 00:20:20 +08:00
Josh Wu f3cd8f8ed0 Typing helper 2023-11-29 21:24:27 +08:00
zxzxwu 2b26de3f3a Merge pull request #348 from zxzxwu/gattc
Typing GATT Client and Device Peer
2023-11-29 15:09:40 +08:00
Josh Wu 0149c4c212 Log track back in on_packet
Many errors are raised in on_packet() callbacks, but currently it only
provides a very brief error message.
2023-11-29 15:01:15 +08:00
Gilles Boccon-Gibod f2ed898784 Merge pull request #352 from google/gbg/more-gatt-uuids
add a few uuids
2023-11-28 22:44:40 -08:00
Josh Wu 464a476f9f Add CSIP 2023-11-29 14:09:31 +08:00
Gilles Boccon-Gibod e85d067fb5 add a few uuids 2023-11-28 20:02:00 -08:00
Josh Wu 04d5bf3afc Typing GATT Client and Device Peer 2023-11-28 21:57:57 +08:00
zxzxwu a13e193d3b Merge pull request #343 from zxzxwu/lea-gatt
Add LE Audio GATT services and characteristics definitions
2023-11-28 10:34:39 +08:00
Gilles Boccon-Gibod 28a1a5ebc2 Merge pull request #347 from akuker/main
Include transport.grpc_protobuf in the setup package.
2023-11-27 15:23:17 -08:00
Tony Kuker 6310dc777f Include transport.grpc_protobuf in the setup package. 2023-11-27 16:48:37 -06:00
Josh Wu 863de18877 Add LE Audio GATT definitions 2023-11-27 17:53:00 +08:00
zxzxwu f0e5cdee1a Merge pull request #339 from zxzxwu/enc
Refactor crypto and fix CTKD
2023-11-27 14:05:37 +08:00
zxzxwu 7bc7d0f5af Merge pull request #334 from zxzxwu/extadv
Add support for LE Extended Advertising
2023-11-27 14:01:31 +08:00
Josh Wu a65a215fd7 Provide IntFlag.name property fallback 2023-11-26 19:42:22 +08:00
Josh Wu 80d34a226d Slightly refactor and fix CTKD
It seems sample input data provided in the spec is big-endian (just
like other AES-CMAC-based functions), but all keys are in little-endian(
HCI standard), so they need to be reverse before and after applying
AES-CMAC.
2023-11-26 16:55:10 +08:00
Josh Wu a9628f73e3 Add support for Extended Advertising 2023-11-26 15:03:09 +08:00
Lucas Abel 9bf2e03354 device: set authenticated and sc state on AES encryption change 2023-11-23 06:39:55 +01:00
Gilles Boccon-Gibod 2900b93bb3 Merge pull request #120 from google/gbg/usb-cleanup
minor cleanup of the internals of the usb transport implementation
2023-11-22 17:18:23 -08:00
Gilles Boccon-Gibod 284cc8a321 Merge pull request #326 from google/gbg/android-benchmark-app
Android benchmarking app
2023-11-22 15:39:52 -08:00
Gilles Boccon-Gibod 9c7089c8ff terminate when unplugged 2023-11-19 11:36:38 -08:00
Gilles Boccon-Gibod a8ec1b0949 minor cleanup of the internals of the usb transport implementation 2023-11-15 17:26:21 -08:00
30 changed files with 2003 additions and 518 deletions
+6
View File
@@ -21,7 +21,9 @@
"cccds",
"cmac",
"CONNECTIONLESS",
"csip",
"csrcs",
"CVSD",
"datagram",
"DATALINK",
"delayreport",
@@ -29,6 +31,7 @@
"deregistration",
"dhkey",
"diversifier",
"endianness",
"Fitbit",
"GATTLINK",
"HANDSFREE",
@@ -38,12 +41,14 @@
"libc",
"libusb",
"MITM",
"MSBC",
"NDIS",
"netsim",
"NONBLOCK",
"NONCONN",
"OXIMETER",
"popleft",
"PRAND",
"protobuf",
"psms",
"pyee",
@@ -55,6 +60,7 @@
"SEID",
"seids",
"SERV",
"SIRK",
"ssrc",
"strerror",
"subband",
+82 -66
View File
@@ -21,6 +21,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import operator
@@ -29,11 +31,13 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.ec import (
generate_private_key,
ECDH,
EllipticCurvePrivateKey,
EllipticCurvePublicNumbers,
EllipticCurvePrivateNumbers,
SECP256R1,
)
from cryptography.hazmat.primitives import cmac
from typing import Tuple
# -----------------------------------------------------------------------------
@@ -46,16 +50,18 @@ logger = logging.getLogger(__name__)
# Classes
# -----------------------------------------------------------------------------
class EccKey:
def __init__(self, private_key):
def __init__(self, private_key: EllipticCurvePrivateKey) -> None:
self.private_key = private_key
@classmethod
def generate(cls):
def generate(cls) -> EccKey:
private_key = generate_private_key(SECP256R1())
return cls(private_key)
@classmethod
def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes):
def from_private_key_bytes(
cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes
) -> EccKey:
d = int.from_bytes(d_bytes, byteorder='big', signed=False)
x = int.from_bytes(x_bytes, byteorder='big', signed=False)
y = int.from_bytes(y_bytes, byteorder='big', signed=False)
@@ -65,7 +71,7 @@ class EccKey:
return cls(private_key)
@property
def x(self):
def x(self) -> bytes:
return (
self.private_key.public_key()
.public_numbers()
@@ -73,14 +79,14 @@ class EccKey:
)
@property
def y(self):
def y(self) -> bytes:
return (
self.private_key.public_key()
.public_numbers()
.y.to_bytes(32, byteorder='big')
)
def dh(self, public_key_x, public_key_y):
def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes:
x = int.from_bytes(public_key_x, byteorder='big', signed=False)
y = int.from_bytes(public_key_y, byteorder='big', signed=False)
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
@@ -93,14 +99,23 @@ class EccKey:
# Functions
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def xor(x, y):
def xor(x: bytes, y: bytes) -> bytes:
assert len(x) == len(y)
return bytes(map(operator.xor, x, y))
# -----------------------------------------------------------------------------
def r():
def reverse(input: bytes) -> bytes:
'''
Returns bytes of input in reversed endianness.
'''
return input[::-1]
# -----------------------------------------------------------------------------
def r() -> bytes:
'''
Generate 16 bytes of random data
'''
@@ -108,20 +123,20 @@ def r():
# -----------------------------------------------------------------------------
def e(key, data):
def e(key: bytes, data: bytes) -> bytes:
'''
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
'''
cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB())
cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB())
encryptor = cipher.encryptor()
return bytes(reversed(encryptor.update(bytes(reversed(data)))))
return reverse(encryptor.update(reverse(data)))
# -----------------------------------------------------------------------------
def ah(k, r): # pylint: disable=redefined-outer-name
def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
'''
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
'''
@@ -132,7 +147,16 @@ def ah(k, r): # pylint: disable=redefined-outer-name
# -----------------------------------------------------------------------------
def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name
def c1(
k: bytes,
r: bytes,
preq: bytes,
pres: bytes,
iat: int,
rat: int,
ia: bytes,
ra: bytes,
) -> bytes: # pylint: disable=redefined-outer-name
'''
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
LE Legacy Pairing
@@ -144,7 +168,7 @@ def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-n
# -----------------------------------------------------------------------------
def s1(k, r1, r2):
def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
Pairing
@@ -154,7 +178,7 @@ def s1(k, r1, r2):
# -----------------------------------------------------------------------------
def aes_cmac(m, k):
def aes_cmac(m: bytes, k: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
@@ -166,20 +190,16 @@ def aes_cmac(m, k):
# -----------------------------------------------------------------------------
def f4(u, v, x, z):
def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
Generation Function f4
'''
return bytes(
reversed(
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x)))
)
)
return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x)))
# -----------------------------------------------------------------------------
def f5(w, n1, n2, a1, a2):
def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
Function f5
@@ -187,87 +207,83 @@ def f5(w, n1, n2, a1, a2):
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
'''
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
t = aes_cmac(bytes(reversed(w)), salt)
t = aes_cmac(reverse(w), salt)
key_id = bytes([0x62, 0x74, 0x6C, 0x65])
return (
bytes(
reversed(
aes_cmac(
bytes([0])
+ key_id
+ bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(a1))
+ bytes(reversed(a2))
+ bytes([1, 0]),
t,
)
reverse(
aes_cmac(
bytes([0])
+ key_id
+ reverse(n1)
+ reverse(n2)
+ reverse(a1)
+ reverse(a2)
+ bytes([1, 0]),
t,
)
),
bytes(
reversed(
aes_cmac(
bytes([1])
+ key_id
+ bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(a1))
+ bytes(reversed(a2))
+ bytes([1, 0]),
t,
)
reverse(
aes_cmac(
bytes([1])
+ key_id
+ reverse(n1)
+ reverse(n2)
+ reverse(a1)
+ reverse(a2)
+ bytes([1, 0]),
t,
)
),
)
# -----------------------------------------------------------------------------
def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name
def f6(
w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes
) -> bytes: # pylint: disable=redefined-outer-name
'''
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
Generation Function f6
'''
return bytes(
reversed(
aes_cmac(
bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(r))
+ bytes(reversed(io_cap))
+ bytes(reversed(a1))
+ bytes(reversed(a2)),
bytes(reversed(w)),
)
return reverse(
aes_cmac(
reverse(n1)
+ reverse(n2)
+ reverse(r)
+ reverse(io_cap)
+ reverse(a1)
+ reverse(a2),
reverse(w),
)
)
# -----------------------------------------------------------------------------
def g2(u, v, x, y):
def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
Value Generation Function g2
'''
return int.from_bytes(
aes_cmac(
bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)),
bytes(reversed(x)),
reverse(u) + reverse(v) + reverse(y),
reverse(x),
)[-4:],
byteorder='big',
)
# -----------------------------------------------------------------------------
def h6(w, key_id):
def h6(w: bytes, key_id: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
'''
return aes_cmac(key_id, w)
return reverse(aes_cmac(key_id, reverse(w)))
# -----------------------------------------------------------------------------
def h7(salt, w):
def h7(salt: bytes, w: bytes) -> bytes:
'''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
'''
return aes_cmac(w, salt)
return reverse(aes_cmac(reverse(w), salt))
+577 -48
View File
@@ -21,8 +21,9 @@ import functools
import json
import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack
from contextlib import asynccontextmanager, AsyncExitStack, closing
from dataclasses import dataclass
from collections.abc import Iterable
from typing import (
Any,
Callable,
@@ -32,6 +33,8 @@ from typing import (
Optional,
Tuple,
Type,
TypeVar,
Set,
Union,
cast,
overload,
@@ -46,6 +49,7 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
@@ -82,31 +86,44 @@ from .hci import (
HCI_Constant,
HCI_Create_Connection_Cancel_Command,
HCI_Create_Connection_Command,
HCI_Create_Connection_Command,
HCI_Disconnect_Command,
HCI_Encryption_Change_Event,
HCI_Error,
HCI_IO_Capability_Request_Reply_Command,
HCI_Inquiry_Cancel_Command,
HCI_Inquiry_Command,
HCI_IsoDataPacket,
HCI_LE_Accept_CIS_Request_Command,
HCI_LE_Add_Device_To_Resolving_List_Command,
HCI_LE_Advertising_Report_Event,
HCI_LE_Clear_Resolving_List_Command,
HCI_LE_Connection_Update_Command,
HCI_LE_Create_Connection_Cancel_Command,
HCI_LE_Create_Connection_Command,
HCI_LE_Create_CIS_Command,
HCI_LE_Enable_Encryption_Command,
HCI_LE_Extended_Advertising_Report_Event,
HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command,
HCI_LE_Reject_CIS_Request_Command,
HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Advertising_Set_Random_Address_Command,
HCI_LE_Set_CIG_Parameters_Command,
HCI_LE_Set_Data_Length_Command,
HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command,
HCI_LE_Set_Extended_Scan_Response_Data_Command,
HCI_LE_Set_Extended_Advertising_Data_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
HCI_LE_Set_Host_Feature_Command,
HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
@@ -121,6 +138,7 @@ from .hci import (
HCI_Switch_Role_Command,
HCI_Set_Connection_Encryption_Command,
HCI_StatusError,
HCI_SynchronousDataPacket,
HCI_User_Confirmation_Request_Negative_Reply_Command,
HCI_User_Confirmation_Request_Reply_Command,
HCI_User_Passkey_Request_Negative_Reply_Command,
@@ -152,9 +170,11 @@ from .core import (
from .utils import (
AsyncRunner,
CompositeEventEmitter,
EventWatcher,
setup_event_forwarding,
composite_listener,
deprecated,
experimental,
)
from .keys import (
KeyStore,
@@ -189,6 +209,8 @@ DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -430,8 +452,11 @@ class LePhyOptions:
# -----------------------------------------------------------------------------
_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy)
class Peer:
def __init__(self, connection):
def __init__(self, connection: Connection) -> None:
self.connection = connection
# Create a GATT client for the connection
@@ -439,77 +464,113 @@ class Peer:
connection.gatt_client = self.gatt_client
@property
def services(self):
def services(self) -> List[gatt_client.ServiceProxy]:
return self.gatt_client.services
async def request_mtu(self, mtu):
async def request_mtu(self, mtu: int) -> int:
mtu = await self.gatt_client.request_mtu(mtu)
self.connection.emit('connection_att_mtu_update')
return mtu
async def discover_service(self, uuid):
async def discover_service(
self, uuid: Union[core.UUID, str]
) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_service(uuid)
async def discover_services(self, uuids=()):
async def discover_services(
self, uuids: Iterable[core.UUID] = ()
) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_services(uuids)
async def discover_included_services(self, service):
async def discover_included_services(
self, service: gatt_client.ServiceProxy
) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_included_services(service)
async def discover_characteristics(self, uuids=(), service=None):
async def discover_characteristics(
self,
uuids: Iterable[Union[core.UUID, str]] = (),
service: Optional[gatt_client.ServiceProxy] = None,
) -> List[gatt_client.CharacteristicProxy]:
return await self.gatt_client.discover_characteristics(
uuids=uuids, service=service
)
async def discover_descriptors(
self, characteristic=None, start_handle=None, end_handle=None
self,
characteristic: Optional[gatt_client.CharacteristicProxy] = None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
):
return await self.gatt_client.discover_descriptors(
characteristic, start_handle, end_handle
)
async def discover_attributes(self):
async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
return await self.gatt_client.discover_attributes()
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
async def subscribe(
self,
characteristic: gatt_client.CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
return await self.gatt_client.subscribe(
characteristic, subscriber, prefer_notify
)
async def unsubscribe(self, characteristic, subscriber=None):
async def unsubscribe(
self,
characteristic: gatt_client.CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
) -> None:
return await self.gatt_client.unsubscribe(characteristic, subscriber)
async def read_value(self, attribute):
async def read_value(
self, attribute: Union[int, gatt_client.AttributeProxy]
) -> bytes:
return await self.gatt_client.read_value(attribute)
async def write_value(self, attribute, value, with_response=False):
async def write_value(
self,
attribute: Union[int, gatt_client.AttributeProxy],
value: bytes,
with_response: bool = False,
) -> None:
return await self.gatt_client.write_value(attribute, value, with_response)
async def read_characteristics_by_uuid(self, uuid, service=None):
async def read_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[bytes]:
return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
def get_services_by_uuid(self, uuid):
def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
return self.gatt_client.get_services_by_uuid(uuid)
def get_characteristics_by_uuid(self, uuid, service=None):
def get_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[gatt_client.CharacteristicProxy]:
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(self, proxy_class):
return proxy_class.from_client(self.gatt_client)
def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS:
return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client))
async def discover_service_and_create_proxy(self, proxy_class):
async def discover_service_and_create_proxy(
self, proxy_class: Type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
if services:
service = services[0]
await service.discover_characteristics()
return self.create_service_proxy(proxy_class)
return None
async def sustain(self, timeout=None):
async def sustain(self, timeout: Optional[float] = None) -> None:
await self.connection.sustain(timeout)
# [Classic only]
async def request_name(self):
async def request_name(self) -> str:
return await self.connection.request_remote_name()
async def __aenter__(self):
@@ -522,7 +583,7 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def __str__(self):
def __str__(self) -> str:
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -541,6 +602,46 @@ class ConnectionParametersPreferences:
ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
@dataclass
class ScoLink(CompositeEventEmitter):
device: Device
acl_connection: Connection
handle: int
link_type: int
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
@dataclass
class CisLink(CompositeEventEmitter):
class State(IntEnum):
PENDING = 0
ESTABLISHED = 1
device: Device
acl_connection: Connection # Based ACL connection
handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
state: State = State.PENDING
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
device: Device
@@ -722,7 +823,7 @@ class Connection(CompositeEventEmitter):
async def switch_role(self, role: int) -> None:
return await self.device.switch_role(self, role)
async def sustain(self, timeout=None):
async def sustain(self, timeout: Optional[float] = None) -> None:
"""Idles the current task waiting for a disconnect or timeout"""
abort = asyncio.get_running_loop().create_future()
@@ -819,6 +920,7 @@ class DeviceConfiguration:
self.keystore = None
self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False
self.cis_enabled = False
def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties
@@ -854,6 +956,7 @@ class DeviceConfiguration:
self.address_resolution_offload = config.get(
'address_resolution_offload', self.address_resolution_offload
)
self.cis_enabled = config.get('cis_enabled', self.cis_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
@@ -960,6 +1063,10 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
extended_advertising_handles: Set[int]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
@composite_listener
class Listener:
@@ -1052,12 +1159,16 @@ class Device(CompositeEventEmitter):
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.pending_connections = {} # Connections, by BD address (BR/EDR only)
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
self.classic_pending_accepts = {
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
# Own address type cache
self.advertising_own_address_type = None
@@ -1080,6 +1191,7 @@ class Device(CompositeEventEmitter):
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled
@@ -1390,6 +1502,16 @@ class Device(CompositeEventEmitter):
) # type: ignore[call-arg]
)
if self.cis_enabled:
await self.send_command(
HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg]
bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
bit_value=1,
)
)
if self.classic_enabled:
await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
@@ -1536,6 +1658,149 @@ class Device(CompositeEventEmitter):
self.advertising = False
self.auto_restart_advertising = False
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
advertising_data: Optional[bytes] = None,
) -> int:
"""Starts an extended advertising set.
Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
Returns:
Handle of the new advertising set.
"""
adv_handle = -1
# Find a free handle
for i in range(
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
adv_handle = i
break
if adv_handle == -1:
raise InvalidStateError('No available advertising set.')
try:
# Set the advertising parameters
await self.send_command(
HCI_LE_Set_Extended_Advertising_Parameters_Command(
advertising_handle=adv_handle,
advertising_event_properties=advertising_properties,
primary_advertising_interval_min=self.advertising_interval_min,
primary_advertising_interval_max=self.advertising_interval_max,
primary_advertising_channel_map=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39
),
own_address_type=own_address_type,
peer_address_type=target.address_type,
peer_address=target,
advertising_tx_power=7,
advertising_filter_policy=0,
primary_advertising_phy=1, # LE 1M
secondary_advertising_max_skip=0,
secondary_advertising_phy=1, # LE 1M
advertising_sid=0,
scan_request_notification_enable=0,
), # type: ignore[call-arg]
check_result=True,
)
# Set the advertising data if present
if advertising_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Advertising_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data,
), # type: ignore[call-arg]
check_result=True,
)
# Set the scan response if present
if scan_response is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
), # type: ignore[call-arg]
check_result=True,
)
if own_address_type in (
OwnAddressType.RANDOM,
OwnAddressType.RESOLVABLE_OR_RANDOM,
):
await self.send_command(
HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle,
random_address=self.random_address,
), # type: ignore[call-arg]
check_result=True,
)
# Enable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=1,
advertising_handles=[adv_handle],
durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg]
check_result=True,
)
except HCI_Error as error:
# When any step fails, cleanup the advertising handle.
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=False,
)
raise error
self.extended_advertising_handles.add(adv_handle)
return adv_handle
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
"""Stops an extended advertising set.
Args:
adv_handle: Handle of the advertising set to stop.
"""
# Disable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=0,
advertising_handles=[adv_handle],
durations=[0],
max_extended_advertising_events=[0],
), # type: ignore[call-arg]
check_result=True,
)
# Remove advertising set
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
@property
def is_advertising(self):
return self.advertising
@@ -2170,7 +2435,9 @@ class Device(CompositeEventEmitter):
check_result=True,
)
async def disconnect(self, connection, reason):
async def disconnect(
self, connection: Union[Connection, ScoLink, CisLink], reason: int
) -> None:
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
connection.on('disconnection', pending_disconnection.set_result)
@@ -2178,7 +2445,7 @@ class Device(CompositeEventEmitter):
# Request a disconnection
result = await self.send_command(
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg]
)
try:
@@ -2641,6 +2908,154 @@ class Device(CompositeEventEmitter):
self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler)
# [LE only]
@experimental('Only for testing.')
async def setup_cig(
self,
cig_id: int,
cis_id: List[int],
sdu_interval: Tuple[int, int],
framing: int,
max_sdu: Tuple[int, int],
retransmission_number: int,
max_transport_latency: Tuple[int, int],
) -> List[int]:
"""Sends HCI_LE_Set_CIG_Parameters_Command.
Args:
cig_id: CIG_ID.
cis_id: CID ID list.
sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
framing: Un-framing(0) or Framing(1).
max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
retransmission_number: retransmission_number.
max_transport_latency: Max transport latencies of
(Central->Peripheral, Peripheral->Cental).
Returns:
List of created CIS handles corresponding to the same order of [cid_id].
"""
num_cis = len(cis_id)
response = await self.send_command(
HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg]
cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1],
worst_case_sca=0x00, # 251-500 ppm
packing=0x00, # Sequential
framing=framing,
max_transport_latency_c_to_p=max_transport_latency[0],
max_transport_latency_p_to_c=max_transport_latency[1],
cis_id=cis_id,
max_sdu_c_to_p=[max_sdu[0]] * num_cis,
max_sdu_p_to_c=[max_sdu[1]] * num_cis,
phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
rtn_c_to_p=[retransmission_number] * num_cis,
rtn_p_to_c=[retransmission_number] * num_cis,
),
check_result=True,
)
# Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
# Server, so here it only provides a basic functionality for testing.
cis_handles = response.return_parameters.connection_handle[:]
for id, cis_handle in zip(cis_id, cis_handles):
self._pending_cis[cis_handle] = (id, cig_id)
return cis_handles
# [LE only]
@experimental('Only for testing.')
async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
for cis_handle, acl_handle in cis_acl_pairs:
acl_connection = self.lookup_connection(acl_handle)
assert acl_connection
cis_id, cig_id = self._pending_cis.pop(cis_handle)
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cis_id=cis_id,
cig_id=cig_id,
)
result = await self.send_command(
HCI_LE_Create_CIS_Command( # type: ignore[call-arg]
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Create_CIS_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishments: Dict[int, asyncio.Future[CisLink]] = {}
for cis_handle, _ in cis_acl_pairs:
pending_cis_establishments[
cis_handle
] = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if pending_future := pending_cis_establishments.get(
cis_link.handle, None
):
pending_future.set_result(cis_link)
return await asyncio.gather(*pending_cis_establishments.values())
# [LE only]
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishment = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)
return await pending_cis_establishment
# [LE only]
@experimental('Only for testing.')
async def reject_cis_request(
self,
handle: int,
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
result = await self.send_command(
HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle, reason=reason
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Reject_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
@host_event_handler
def on_flush(self):
self.emit('flush')
@@ -2845,30 +3260,35 @@ class Device(CompositeEventEmitter):
)
@host_event_handler
@with_connection_from_handle
def on_disconnection(self, connection, reason):
logger.debug(
f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}'
)
connection.emit('disconnection', reason)
def on_disconnection(self, connection_handle: int, reason: int) -> None:
if connection := self.connections.pop(connection_handle, None):
logger.debug(
f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}'
)
connection.emit('disconnection', reason)
# Remove the connection from the map
del self.connections[connection.handle]
# Cleanup subsystems that maintain per-connection state
self.gatt_server.on_disconnection(connection)
# Cleanup subsystems that maintain per-connection state
self.gatt_server.on_disconnection(connection)
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type,
own_address_type=self.advertising_own_address_type,
auto_restart=True,
),
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type, # type: ignore[arg-type]
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
cis_link.emit('disconnection', reason)
else:
logger.error(
f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
)
@host_event_handler
@@ -3147,6 +3567,107 @@ class Device(CompositeEventEmitter):
connection.emit('remote_name_failure', error)
self.emit('remote_name_failure', address, error)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection(
self, acl_connection: Connection, sco_handle: int, link_type: int
) -> None:
logger.debug(
f'*** SCO connected: {acl_connection.peer_address}, '
f'sco_handle=[0x{sco_handle:04X}], '
f'link_type=[0x{link_type:02X}] ***'
)
sco_link = self.sco_links[sco_handle] = ScoLink(
device=self,
acl_connection=acl_connection,
handle=sco_handle,
link_type=link_type,
)
self.emit('sco_connection', sco_link)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection_failure(
self, acl_connection: Connection, status: int
) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
self.emit('sco_connection_failure')
# [Classic only]
@host_event_handler
@experimental('Only for testing')
def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@with_connection_from_handle
@experimental('Only for testing')
def on_cis_request(
self,
acl_connection: Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'*** CIS Request '
f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cig_id:02X}], '
f'cis_id=[0x{cis_id:02X}] ***'
)
# LE_CIS_Established event doesn't provide info, so we must store them here.
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cig_id=cig_id,
cis_id=cis_id,
)
self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment(self, cis_handle: int) -> None:
cis_link = self.cis_links[cis_handle]
cis_link.state = CisLink.State.ESTABLISHED
assert cis_link.acl_connection
logger.debug(
f'*** CIS Establishment '
f'{cis_link.acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cis_link.cig_id:02X}], '
f'cis_id=[0x{cis_link.cis_id:02X}] ***'
)
cis_link.emit('establishment')
self.emit('cis_establishment', cis_link)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle, None):
cis_link.emit('establishment_failure')
self.emit('cis_establishment_failure', cis_handle, status)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
if cis_link := self.cis_links.get(handle, None):
cis_link.emit('pdu', packet)
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption):
@@ -3158,10 +3679,18 @@ class Device(CompositeEventEmitter):
connection.encryption = encryption
if (
not connection.authenticated
and connection.transport == BT_BR_EDR_TRANSPORT
and encryption == HCI_Encryption_Change_Event.AES_CCM
):
connection.authenticated = True
connection.sc = True
if (
not connection.authenticated
and connection.transport == BT_LE_TRANSPORT
and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM
):
connection.authenticated = True
connection.sc = True
connection.emit('connection_encryption_change')
@host_event_handler
+116 -6
View File
@@ -93,20 +93,35 @@ GATT_RECONNECTION_CONFIGURATION_SERVICE = UUID.from_16_bits(0x1829, 'Reconne
GATT_INSULIN_DELIVERY_SERVICE = UUID.from_16_bits(0x183A, 'Insulin Delivery')
GATT_BINARY_SENSOR_SERVICE = UUID.from_16_bits(0x183B, 'Binary Sensor')
GATT_EMERGENCY_CONFIGURATION_SERVICE = UUID.from_16_bits(0x183C, 'Emergency Configuration')
GATT_AUTHORIZATION_CONTROL_SERVICE = UUID.from_16_bits(0x183D, 'Authorization Control')
GATT_PHYSICAL_ACTIVITY_MONITOR_SERVICE = UUID.from_16_bits(0x183E, 'Physical Activity Monitor')
GATT_ELAPSED_TIME_SERVICE = UUID.from_16_bits(0x183F, 'Elapsed Time')
GATT_GENERIC_HEALTH_SENSOR_SERVICE = UUID.from_16_bits(0x1840, 'Generic Health Sensor')
GATT_AUDIO_INPUT_CONTROL_SERVICE = UUID.from_16_bits(0x1843, 'Audio Input Control')
GATT_VOLUME_CONTROL_SERVICE = UUID.from_16_bits(0x1844, 'Volume Control')
GATT_VOLUME_OFFSET_CONTROL_SERVICE = UUID.from_16_bits(0x1845, 'Volume Offset Control')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification Service')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification')
GATT_DEVICE_TIME_SERVICE = UUID.from_16_bits(0x1847, 'Device Time')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control Service')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control Service')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control')
GATT_CONSTANT_TONE_EXTENSION_SERVICE = UUID.from_16_bits(0x184A, 'Constant Tone Extension')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer Service')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer Service')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer')
GATT_MICROPHONE_CONTROL_SERVICE = UUID.from_16_bits(0x184D, 'Microphone Control')
GATT_AUDIO_STREAM_CONTROL_SERVICE = UUID.from_16_bits(0x184E, 'Audio Stream Control')
GATT_BROADCAST_AUDIO_SCAN_SERVICE = UUID.from_16_bits(0x184F, 'Broadcast Audio Scan')
GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE = UUID.from_16_bits(0x1850, 'Published Audio Capabilities')
GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1851, 'Basic Audio Announcement')
GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1852, 'Broadcast Audio Announcement')
GATT_COMMON_AUDIO_SERVICE = UUID.from_16_bits(0x1853, 'Common Audio')
GATT_HEARING_ACCESS_SERVICE = UUID.from_16_bits(0x1854, 'Hearing Access')
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE = UUID.from_16_bits(0x1855, 'Telephony and Media Audio')
GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1856, 'Public Broadcast Announcement')
GATT_ELECTRONIC_SHELF_LABEL_SERVICE = UUID.from_16_bits(0X1857, 'Electronic Shelf Label')
GATT_GAMING_AUDIO_SERVICE = UUID.from_16_bits(0x1858, 'Gaming Audio')
GATT_MESH_PROXY_SOLICITATION_SERVICE = UUID.from_16_bits(0x1859, 'Mesh Audio Solicitation')
# Types
# Attribute Types
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2800, 'Primary Service')
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2801, 'Secondary Service')
GATT_INCLUDE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2802, 'Include')
@@ -129,6 +144,8 @@ GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C,
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
GATT_OBSERVATION_SCHEDULE_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Observation Schedule')
GATT_VALID_RANGE_AND_ACCURACY_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Valid Range And Accuracy')
# Device Information Service
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
@@ -156,6 +173,96 @@ GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Telephony And Media Audio Service (TMAS)
GATT_TMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2B51, 'TMAP Role')
# Audio Input Control Service (AICS)
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B77, 'Audio Input State')
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC = UUID.from_16_bits(0x2B78, 'Gain Settings Attribute')
GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC = UUID.from_16_bits(0x2B79, 'Audio Input Type')
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC = UUID.from_16_bits(0x2B7A, 'Audio Input Status')
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7B, 'Audio Input Control Point')
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B7C, 'Audio Input Description')
# Volume Control Service (VCS)
GATT_VOLUME_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B7D, 'Volume State')
GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7E, 'Volume Control Point')
GATT_VOLUME_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2B7F, 'Volume Flags')
# Volume Offset Control Service (VOCS)
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B80, 'Volume Offset State')
GATT_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2B81, 'Audio Location')
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B82, 'Volume Offset Control Point')
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B83, 'Audio Output Description')
# Coordinated Set Identification Service (CSIS)
GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC = UUID.from_16_bits(0x2B84, 'Set Identity Resolving Key')
GATT_COORDINATED_SET_SIZE_CHARACTERISTIC = UUID.from_16_bits(0x2B85, 'Coordinated Set Size')
GATT_SET_MEMBER_LOCK_CHARACTERISTIC = UUID.from_16_bits(0x2B86, 'Set Member Lock')
GATT_SET_MEMBER_RANK_CHARACTERISTIC = UUID.from_16_bits(0x2B87, 'Set Member Rank')
# Media Control Service (MCS)
GATT_MEDIA_PLAYER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2B93, 'Media Player Name')
GATT_MEDIA_PLAYER_ICON_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B94, 'Media Player Icon Object ID')
GATT_MEDIA_PLAYER_ICON_URL_CHARACTERISTIC = UUID.from_16_bits(0x2B95, 'Media Player Icon URL')
GATT_TRACK_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2B96, 'Track Changed')
GATT_TRACK_TITLE_CHARACTERISTIC = UUID.from_16_bits(0x2B97, 'Track Title')
GATT_TRACK_DURATION_CHARACTERISTIC = UUID.from_16_bits(0x2B98, 'Track Duration')
GATT_TRACK_POSITION_CHARACTERISTIC = UUID.from_16_bits(0x2B99, 'Track Position')
GATT_PLAYBACK_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9A, 'Playback Speed')
GATT_SEEKING_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9B, 'Seeking Speed')
GATT_CURRENT_TRACK_SEGMENTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9C, 'Current Track Segments Object ID')
GATT_CURRENT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9D, 'Current Track Object ID')
GATT_NEXT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9E, 'Next Track Object ID')
GATT_PARENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9F, 'Parent Group Object ID')
GATT_CURRENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA0, 'Current Group Object ID')
GATT_PLAYING_ORDER_CHARACTERISTIC = UUID.from_16_bits(0x2BA1, 'Playing Order')
GATT_PLAYING_ORDERS_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA2, 'Playing Orders Supported')
GATT_MEDIA_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BA3, 'Media State')
GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA4, 'Media Control Point')
GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA5, 'Media Control Point Opcodes Supported')
GATT_SEARCH_RESULTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA6, 'Search Results Object ID')
GATT_SEARCH_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA7, 'Search Control Point')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Content Control Id')
# Telephone Bearer Service (TBS)
GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BB4, 'Bearer Provider Name')
GATT_BEARER_UCI_CHARACTERISTIC = UUID.from_16_bits(0x2BB5, 'Bearer UCI')
GATT_BEARER_TECHNOLOGY_CHARACTERISTIC = UUID.from_16_bits(0x2BB6, 'Bearer Technology')
GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2BB7, 'Bearer URI Schemes Supported List')
GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength')
GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer Signal Strength Reporting Interval')
GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Bearer List Current Calls')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBB, 'Content Control ID')
GATT_STATUS_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2BBC, 'Status Flags')
GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC = UUID.from_16_bits(0x2BBD, 'Incoming Call Target Bearer URI')
GATT_CALL_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BBE, 'Call State')
GATT_CALL_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BBF, 'Call Control Point')
GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC = UUID.from_16_bits(0x2BC0, 'Call Control Point Optional Opcodes')
GATT_TERMINATION_REASON_CHARACTERISTIC = UUID.from_16_bits(0x2BC1, 'Termination Reason')
GATT_INCOMING_CALL_CHARACTERISTIC = UUID.from_16_bits(0x2BC2, 'Incoming Call')
GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Call Friendly Name')
# Microphone Control Service (MICS)
GATT_MUTE_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Mute')
# Audio Stream Control Service (ASCS)
GATT_SINK_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC4, 'Sink ASE')
GATT_SOURCE_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC5, 'Source ASE')
GATT_ASE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC6, 'ASE Control Point')
# Broadcast Audio Scan Service (BASS)
GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC7, 'Broadcast Audio Scan Control Point')
GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BC8, 'Broadcast Receive State')
# Published Audio Capabilities Service (PACS)
GATT_SINK_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BC9, 'Sink PAC')
GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCA, 'Sink Audio Location')
GATT_SOURCE_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BCB, 'Source PAC')
GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Source Audio Location')
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
# ASHA Service
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
@@ -177,6 +284,9 @@ GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bi
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features')
GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash')
GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
# fmt: on
# pylint: enable=line-too-long
+56 -20
View File
@@ -38,6 +38,7 @@ from typing import (
Any,
Iterable,
Type,
Set,
TYPE_CHECKING,
)
@@ -128,7 +129,7 @@ class ServiceProxy(AttributeProxy):
included_services: List[ServiceProxy]
@staticmethod
def from_client(service_class, client, service_uuid):
def from_client(service_class, client: Client, service_uuid: UUID):
# The service and its characteristics are considered to have already been
# discovered
services = client.get_services_by_uuid(service_uuid)
@@ -206,11 +207,11 @@ class CharacteristicProxy(AttributeProxy):
return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None):
async def unsubscribe(self, subscriber=None, force=False):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber)
return await self.client.unsubscribe(self, subscriber, force)
def __str__(self) -> str:
return (
@@ -246,8 +247,12 @@ class ProfileServiceProxy:
class Client:
services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]]
notification_subscribers: Dict[int, Callable[[bytes], Any]]
indication_subscribers: Dict[int, Callable[[bytes], Any]]
notification_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
indication_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
pending_response: Optional[asyncio.futures.Future[ATT_PDU]]
pending_request: Optional[ATT_PDU]
@@ -257,10 +262,8 @@ class Client:
self.request_semaphore = asyncio.Semaphore(1)
self.pending_request = None
self.pending_response = None
self.notification_subscribers = (
{}
) # Notification subscribers, by attribute handle
self.indication_subscribers = {} # Indication subscribers, by attribute handle
self.notification_subscribers = {} # Subscriber set, by attribute handle
self.indication_subscribers = {} # Subscriber set, by attribute handle
self.services = []
self.cached_values = {}
@@ -682,8 +685,8 @@ class Client:
async def discover_descriptors(
self,
characteristic: Optional[CharacteristicProxy] = None,
start_handle=None,
end_handle=None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
) -> List[DescriptorProxy]:
'''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
@@ -789,7 +792,12 @@ class Client:
return attributes
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
async def subscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
# If we haven't already discovered the descriptors for this characteristic,
# do it now
if not characteristic.descriptors_discovered:
@@ -826,6 +834,7 @@ class Client:
subscriber_set = subscribers.setdefault(characteristic.handle, set())
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the
# characteristic emitting an 'update' event when a notification or indication
# is received
@@ -833,7 +842,18 @@ class Client:
await self.write_value(cccd, struct.pack('<H', bits), with_response=True)
async def unsubscribe(self, characteristic, subscriber=None):
async def unsubscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
force: bool = False,
) -> None:
'''
Unsubscribe from a characteristic.
If `force` is True, this will write zeros to the CCCD when there are no
subscribers left, even if there were already no registered subscribers.
'''
# If we haven't already discovered the descriptors for this characteristic,
# do it now
if not characteristic.descriptors_discovered:
@@ -847,31 +867,45 @@ class Client:
logger.warning('unsubscribing from characteristic with no CCCD descriptor')
return
# Check if the characteristic has subscribers
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
if not force:
return
# Remove the subscriber(s)
if subscriber is not None:
# Remove matching subscriber from subscriber sets
for subscriber_set in (
self.notification_subscribers,
self.indication_subscribers,
):
subscribers = subscriber_set.get(characteristic.handle, [])
if subscriber in subscribers:
if (
subscribers := subscriber_set.get(characteristic.handle)
) and subscriber in subscribers:
subscribers.remove(subscriber)
# Cleanup if we removed the last one
if not subscribers:
del subscriber_set[characteristic.handle]
else:
# Remove all subscribers for this attribute from the sets!
# Remove all subscribers for this attribute from the sets
self.notification_subscribers.pop(characteristic.handle, None)
self.indication_subscribers.pop(characteristic.handle, None)
if not self.notification_subscribers and not self.indication_subscribers:
# Update the CCCD
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
# No more subscribers left
await self.write_value(cccd, b'\x00\x00', with_response=True)
async def read_value(
self, attribute: Union[int, AttributeProxy], no_long_read: bool = False
) -> Any:
) -> bytes:
'''
See Vol 3, Part G - 4.8.1 Read Characteristic Value
@@ -1067,7 +1101,7 @@ class Client:
def on_att_handle_value_notification(self, notification):
# Call all subscribers
subscribers = self.notification_subscribers.get(
notification.attribute_handle, []
notification.attribute_handle, set()
)
if not subscribers:
logger.warning('!!! received notification with no subscriber')
@@ -1081,7 +1115,9 @@ class Client:
def on_att_handle_value_indication(self, indication):
# Call all subscribers
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
subscribers = self.indication_subscribers.get(
indication.attribute_handle, set()
)
if not subscribers:
logger.warning('!!! received indication with no subscriber')
+113 -99
View File
@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import collections
import dataclasses
import enum
import functools
import logging
@@ -1382,6 +1383,45 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = {
STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)}
class CodecID(enum.IntEnum):
# fmt: off
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
LINEAR_PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
VENDOR_SPECIFIC = 0xFF
@dataclasses.dataclass(frozen=True)
class CodingFormat:
codec_id: CodecID
company_id: int = 0
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
return offset + 5, cls(
codec_id=CodecID(codec_id),
company_id=company_id,
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@staticmethod
@@ -1477,6 +1517,12 @@ class HCI_Object:
# The rest of the bytes
field_value = data[offset:]
return (field_value, len(field_value))
if field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_length = data[offset]
offset += 1
field_value = data[offset : offset + field_length]
return (field_value, field_length + 1)
if field_type == 1:
# 8-bit unsigned
return (data[offset], 1)
@@ -1581,6 +1627,11 @@ class HCI_Object:
raise ValueError('value too large for *-typed field')
else:
field_bytes = bytes(field_value)
elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_bytes)
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes'
):
@@ -1888,6 +1939,7 @@ Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS
Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS)
Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS)
# -----------------------------------------------------------------------------
class OwnAddressType:
PUBLIC = 0
@@ -2445,14 +2497,14 @@ class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
('connection_handle', 2),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -2473,22 +2525,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command
'''
class CodingFormat(enum.IntEnum):
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
def to_bytes(self):
return self.value.to_bytes(5, 'little')
def __bytes__(self):
return self.to_bytes()
class PcmDataFormat(enum.IntEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
@@ -2525,14 +2561,14 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
('bd_addr', Address.parse_address),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -3829,8 +3865,10 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'advertising_event_properties',
{
'size': 2,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(
x
'mapper': lambda x: str(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
x
)
),
},
),
@@ -3840,8 +3878,8 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'primary_advertising_channel_map',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(
x
'mapper': lambda x: str(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(x)
),
},
),
@@ -3863,38 +3901,33 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''
CONNECTABLE_ADVERTISING = 0
SCANNABLE_ADVERTISING = 1
DIRECTED_ADVERTISING = 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
USE_LEGACY_ADVERTISING_PDUS = 4
ANONYMOUS_ADVERTISING = 5
INCLUDE_TX_POWER = 6
class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
DIRECTED_ADVERTISING = 1 << 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
USE_LEGACY_ADVERTISING_PDUS = 1 << 4
ANONYMOUS_ADVERTISING = 1 << 5
INCLUDE_TX_POWER = 1 << 6
ADVERTISING_PROPERTIES_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
'USE_LEGACY_ADVERTISING_PDUS',
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER',
)
def __str__(self) -> str:
return '|'.join(
flag.name
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
if self.value & flag.value and flag.name is not None
)
CHANNEL_37 = 0
CHANNEL_38 = 1
CHANNEL_39 = 2
class ChannelMap(enum.IntFlag):
CHANNEL_37 = 1 << 0
CHANNEL_38 = 1 << 1
CHANNEL_39 = 1 << 2
CHANNEL_NAMES = ('37', '38', '39')
@classmethod
def advertising_properties_string(cls, properties):
# pylint: disable=line-too-long
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
def __str__(self) -> str:
return '|'.join(
flag.name
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
if self.value & flag.value and flag.name is not None
)
# -----------------------------------------------------------------------------
@@ -3906,9 +3939,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
),
).name,
},
),
('fragment_preference', 1),
@@ -3926,23 +3959,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
class Operation(enum.IntEnum):
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
# -----------------------------------------------------------------------------
@@ -3954,9 +3976,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
),
).name,
},
),
('fragment_preference', 1),
@@ -3974,22 +3996,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -4481,7 +4487,10 @@ class HCI_LE_Accept_CIS_Request_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('connection_handle', 2)],
fields=[
('connection_handle', 2),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
)
class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
@@ -4489,6 +4498,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@@ -4497,9 +4507,9 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
('connection_handle', 2),
('data_path_direction', 1),
('data_path_id', 1),
('codec_id', 5),
('codec_id', CodingFormat.parse_from_bytes),
('controller_delay', 3),
('codec_configuration', '*'),
('codec_configuration', 'v'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
@@ -4514,9 +4524,9 @@ class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command):
connection_handle: int
data_path_direction: int
data_path_id: int
codec_id: int
codec_id: CodingFormat
controller_delay: int
codec_configuration: int
codec_configuration: bytes
# -----------------------------------------------------------------------------
@@ -5326,6 +5336,10 @@ class HCI_Disconnection_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.5 Disconnection Complete Event
'''
status: int
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)])
+67 -42
View File
@@ -15,30 +15,39 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Callable, MutableMapping
from typing import cast, Any
import logging
from .colors import color
from .att import ATT_CID, ATT_PDU
from .smp import SMP_CID, SMP_Command
from .core import name_or_number
from .l2cap import (
from bumble import avdtp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
from bumble.core import name_or_number
from bumble.l2cap import (
L2CAP_PDU,
L2CAP_CONNECTION_REQUEST,
L2CAP_CONNECTION_RESPONSE,
L2CAP_SIGNALING_CID,
L2CAP_LE_SIGNALING_CID,
L2CAP_Control_Frame,
L2CAP_Connection_Request,
L2CAP_Connection_Response,
)
from .hci import (
from bumble.hci import (
HCI_EVENT_PACKET,
HCI_ACL_DATA_PACKET,
HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AclDataPacketAssembler,
HCI_Packet,
HCI_Event,
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from .rfcomm import RFCOMM_Frame, RFCOMM_PSM
from .sdp import SDP_PDU, SDP_PSM
from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
# -----------------------------------------------------------------------------
# Logging
@@ -50,23 +59,25 @@ logger = logging.getLogger(__name__)
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
AVDTP_PSM: 'AVDTP'
# TODO: add more PSM values
avdtp.AVDTP_PSM: 'AVDTP',
}
# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
def __init__(self, analyzer):
psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None # ACL stream in the other direction
# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu):
def on_acl_pdu(self, pdu: bytes) -> None:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
if l2cap_pdu.cid == ATT_CID:
@@ -81,26 +92,30 @@ class PacketTracer:
# Check if this signals a new channel
if control_frame.code == L2CAP_CONNECTION_REQUEST:
self.psms[control_frame.source_cid] = control_frame.psm
connection_request = cast(L2CAP_Connection_Request, control_frame)
self.psms[connection_request.source_cid] = connection_request.psm
elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
connection_response = cast(L2CAP_Connection_Response, control_frame)
if (
control_frame.result
connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer:
if psm := self.peer.psms.get(control_frame.source_cid):
if psm := self.peer.psms.get(
connection_response.source_cid
):
# Found a pending connection
self.psms[control_frame.destination_cid] = psm
self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == AVDTP_PSM:
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
control_frame.source_cid
] = AVDTP_MessageAssembler(self.on_avdtp_message)
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
control_frame.destination_cid
] = AVDTP_MessageAssembler(
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)
@@ -113,7 +128,7 @@ class PacketTracer:
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == AVDTP_PSM:
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
@@ -130,22 +145,26 @@ class PacketTracer:
else:
self.analyzer.emit(l2cap_pdu)
def on_avdtp_message(self, transaction_label, message):
def on_avdtp_message(
self, transaction_label: int, message: avdtp.Message
) -> None:
self.analyzer.emit(
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)
def feed_packet(self, packet):
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
self.packet_assembler.feed_packet(packet)
class Analyzer:
def __init__(self, label, emit_message):
acl_streams: MutableMapping[int, PacketTracer.AclStream]
peer: PacketTracer.Analyzer
def __init__(self, label: str, emit_message: Callable[..., None]) -> None:
self.label = label
self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle
self.peer = None # Analyzer in the other direction
def start_acl_stream(self, connection_handle):
def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
logger.info(
f'[{self.label}] +++ Creating ACL stream for connection '
f'0x{connection_handle:04X}'
@@ -160,7 +179,7 @@ class PacketTracer:
return stream
def end_acl_stream(self, connection_handle):
def end_acl_stream(self, connection_handle: int) -> None:
if connection_handle in self.acl_streams:
logger.info(
f'[{self.label}] --- Removing ACL stream for connection '
@@ -171,23 +190,29 @@ class PacketTracer:
# Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle)
def on_packet(self, packet):
def on_packet(self, packet: HCI_Packet) -> None:
self.emit(packet)
if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
acl_packet = cast(HCI_AclDataPacket, packet)
# Look for an existing stream for this handle, create one if it is the
# first ACL packet for that connection handle
if (stream := self.acl_streams.get(packet.connection_handle)) is None:
stream = self.start_acl_stream(packet.connection_handle)
stream.feed_packet(packet)
if (
stream := self.acl_streams.get(acl_packet.connection_handle)
) is None:
stream = self.start_acl_stream(acl_packet.connection_handle)
stream.feed_packet(acl_packet)
elif packet.hci_packet_type == HCI_EVENT_PACKET:
if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(packet.connection_handle)
event_packet = cast(HCI_Event, packet)
if event_packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(
cast(HCI_Disconnection_Complete_Event, packet).connection_handle
)
def emit(self, message):
def emit(self, message: Any) -> None:
self.emit_message(f'[{self.label}] {message}')
def trace(self, packet, direction=0):
def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
if direction == 0:
self.host_to_controller_analyzer.on_packet(packet)
else:
@@ -195,10 +220,10 @@ class PacketTracer:
def __init__(
self,
host_to_controller_label=color('HOST->CONTROLLER', 'blue'),
controller_to_host_label=color('CONTROLLER->HOST', 'cyan'),
emit_message=logger.info,
):
host_to_controller_label: str = color('HOST->CONTROLLER', 'blue'),
controller_to_host_label: str = color('CONTROLLER->HOST', 'cyan'),
emit_message: Callable[..., None] = logger.info,
) -> None:
self.host_to_controller_analyzer = PacketTracer.Analyzer(
host_to_controller_label, emit_message
)
+37 -27
View File
@@ -22,7 +22,7 @@ import dataclasses
import enum
import traceback
import warnings
from typing import Dict, List, Union, Set, TYPE_CHECKING
from typing import Dict, List, Union, Set, Any, TYPE_CHECKING
from . import at
from . import rfcomm
@@ -35,7 +35,11 @@ from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hci import (
HCI_Enhanced_Setup_Synchronous_Connection_Command,
CodingFormat,
CodecID,
)
from bumble.sdp import (
DataElement,
ServiceAttribute,
@@ -66,6 +70,7 @@ class HfpProtocolError(ProtocolError):
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
@@ -842,19 +847,15 @@ class DefaultCodecParameters(enum.IntEnum):
@dataclasses.dataclass
class EscoParameters:
# Codec specific
transmit_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat
receive_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat
transmit_coding_format: CodingFormat
receive_coding_format: CodingFormat
packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
max_latency: int
# Common
input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16
output_coded_data_size: int = 16
input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
@@ -880,26 +881,31 @@ class EscoParameters:
transmit_codec_frame_size: int = 60
receive_codec_frame_size: int = 60
def asdict(self) -> Dict[str, Any]:
# dataclasses.asdict() will recursively deep-copy the entire object,
# which is expensive and breaks CodingFormat object, so let it simply copy here.
return self.__dict__
_ESCO_PARAMETERS_CVSD_D0 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_D1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -912,8 +918,8 @@ _ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -925,8 +931,8 @@ _ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000A,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -938,8 +944,8 @@ _ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000C,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -951,8 +957,8 @@ _ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
)
_ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x0008,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -960,12 +966,14 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
_ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x000D,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -974,10 +982,12 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
ESCO_PERAMETERS = {
ESCO_PARAMETERS = {
DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,
+27 -3
View File
@@ -32,8 +32,8 @@ from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
@@ -52,6 +52,7 @@ from .hci import (
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
@@ -75,7 +76,6 @@ from .core import (
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
InvalidStateError,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
@@ -243,7 +243,7 @@ class Host(AbortableEventEmitter):
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFF00000000000')
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -495,6 +495,8 @@ class Host(AbortableEventEmitter):
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
@@ -515,6 +517,10 @@ class Host(AbortableEventEmitter):
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -715,6 +721,24 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
event.acl_connection_handle,
event.cis_connection_handle,
event.cig_id,
event.cis_id,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')
+8
View File
@@ -391,6 +391,9 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
'''
psm: int
source_cid: int
@staticmethod
def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
psm_length = 2
@@ -432,6 +435,11 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
'''
source_cid: int
destination_cid: int
status: int
result: int
CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
+147
View File
@@ -0,0 +1,147 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import Optional
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
ENCRYPTED = 0x00
PLAINTEXT = 0x01
class MemberLock(enum.IntEnum):
'''Coordinated Set Identification Service - 5.3 Set Member Lock.'''
UNLOCKED = 0x01
LOCKED = 0x02
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
# TODO: Implement RSI Generator
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationService(gatt.TemplateService):
UUID = gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE
set_identity_resolving_key_characteristic: gatt.Characteristic
coordinated_set_size_characteristic: Optional[gatt.Characteristic] = None
set_member_lock_characteristic: Optional[gatt.Characteristic] = None
set_member_rank_characteristic: Optional[gatt.Characteristic] = None
def __init__(
self,
set_identity_resolving_key: bytes,
coordinated_set_size: Optional[int] = None,
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
characteristics = []
self.set_identity_resolving_key_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
# TODO: Implement encrypted SIRK reader.
value=struct.pack('B', SirkType.PLAINTEXT) + set_identity_resolving_key,
)
characteristics.append(self.set_identity_resolving_key_characteristic)
if coordinated_set_size is not None:
self.coordinated_set_size_characteristic = gatt.Characteristic(
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
if set_member_lock is not None:
self.set_member_lock_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
characteristics.append(self.set_member_lock_characteristic)
if set_member_rank is not None:
self.set_member_rank_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = CoordinatedSetIdentificationService
set_identity_resolving_key: gatt_client.CharacteristicProxy
coordinated_set_size: Optional[gatt_client.CharacteristicProxy] = None
set_member_lock: Optional[gatt_client.CharacteristicProxy] = None
set_member_rank: Optional[gatt_client.CharacteristicProxy] = None
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.set_identity_resolving_key = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC
):
self.coordinated_set_size = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC
):
self.set_member_lock = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]
+66 -43
View File
@@ -187,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# fmt: on
# pylint: enable=line-too-long
@@ -579,7 +579,7 @@ class OobContext:
self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData:
pkx = bytes(reversed(self.ecc_key.x))
pkx = self.ecc_key.x[::-1]
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
@@ -677,6 +677,13 @@ class Session:
},
}
ea: bytes
eb: bytes
ltk: bytes
preq: bytes
pres: bytes
tk: bytes
def __init__(
self,
manager: Manager,
@@ -686,15 +693,10 @@ class Session:
) -> None:
self.manager = manager
self.connection = connection
self.preq: Optional[bytes] = None
self.pres: Optional[bytes] = None
self.ea = None
self.eb = None
self.stk = None
self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key = None
self.link_key: Optional[bytes] = None
self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None
@@ -787,9 +789,7 @@ class Session:
)
self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is None:
self.tk = None
else:
if pairing_config.oob.legacy_context is not None:
self.tk = pairing_config.oob.legacy_context.tk
else:
if pairing_config.oob.legacy_context is None:
@@ -807,7 +807,7 @@ class Session:
@property
def pkx(self) -> Tuple[bytes, bytes]:
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x)
return (self.ecc_key.x[::-1], self.peer_public_key_x)
@property
def pka(self) -> bytes:
@@ -1061,8 +1061,8 @@ class Session:
def send_public_key_command(self) -> None:
self.send_command(
SMP_Pairing_Public_Key_Command(
public_key_x=bytes(reversed(self.ecc_key.x)),
public_key_y=bytes(reversed(self.ecc_key.y)),
public_key_x=self.ecc_key.x[::-1],
public_key_y=self.ecc_key.y[::-1],
)
)
@@ -1098,15 +1098,52 @@ class Session:
)
)
async def derive_ltk(self) -> None:
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
assert link_key is not None
@classmethod
def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
'''Derives Long Term Key from Link Key.
Args:
link_key: BR/EDR Link Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
LE Long Tern Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
if self.ct2
if ct2
else crypto.h6(link_key, b'tmp2')
)
self.ltk = crypto.h6(ilk, b'brle')
return crypto.h6(ilk, b'brle')
@classmethod
def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
'''Derives Link Key from Long Term Key.
Args:
ltk: LE Long Term Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
BR/EDR Link Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
if ct2
else crypto.h6(ltk, b'tmp1')
)
return crypto.h6(ilk, b'lebr')
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
self.send_pairing_failed(
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
def distribute_keys(self) -> None:
# Distribute the keys as required
@@ -1117,7 +1154,7 @@ class Session:
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk()
'disconnection', self.get_link_key_and_derive_ltk()
)
elif not self.sc:
# Distribute the LTK, EDIV and RAND
@@ -1147,12 +1184,7 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
self.link_key = self.derive_link_key(self.ltk, self.ct2)
else:
# CTKD: Derive LTK from LinkKey
@@ -1161,7 +1193,7 @@ class Session:
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk()
'disconnection', self.get_link_key_and_derive_ltk()
)
# Distribute the LTK, EDIV and RAND
elif not self.sc:
@@ -1191,12 +1223,7 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
self.link_key = self.derive_link_key(self.ltk, self.ct2)
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase
@@ -1754,14 +1781,10 @@ class Session:
self.peer_public_key_y = command.public_key_y
# Compute the DH key
self.dh_key = bytes(
reversed(
self.ecc_key.dh(
bytes(reversed(command.public_key_x)),
bytes(reversed(command.public_key_y)),
)
)
)
self.dh_key = self.ecc_key.dh(
command.public_key_x[::-1],
command.public_key_y[::-1],
)[::-1]
logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB:
@@ -1824,7 +1847,6 @@ class Session:
else:
self.send_pairing_dhkey_check_command()
else:
assert self.ltk
self.start_encryption(self.ltk)
def on_smp_pairing_failed_command(
@@ -1874,6 +1896,7 @@ class Manager(EventEmitter):
sessions: Dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session]
_ecc_key: Optional[crypto.EccKey]
def __init__(
self,
+1 -1
View File
@@ -150,7 +150,7 @@ class PacketParser:
try:
self.sink.on_packet(bytes(self.packet))
except Exception as error:
logger.warning(
logger.exception(
color(f'!!! Exception in on_packet: {error}', 'red')
)
self.reset()
+58 -61
View File
@@ -24,9 +24,10 @@ import platform
import usb1
from .common import Transport, ParserSource
from .. import hci
from ..colors import color
from bumble.transport.common import Transport, ParserSource
from bumble import hci
from bumble.colors import color
from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
@@ -113,7 +114,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer()
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future()
@@ -137,21 +138,20 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump
self.process_queue()
def on_packet_sent(self, transfer):
def transfer_callback(self, transfer):
status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_)
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red')
color(f'!!! OUT transfer not completed: status={status}', 'red')
)
def on_packet_sent_(self):
def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()
@@ -163,22 +163,20 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
logger.debug('submit ACL')
self.transfer.submit()
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl(
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.on_packet_sent,
callback=self.transfer_callback,
)
logger.debug('submit COMMAND')
self.transfer.submit()
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -193,11 +191,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear()
# If we have a transfer in flight, cancel it
if self.transfer.isSubmitted():
if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have
# already completed
try:
self.transfer.cancel()
self.acl_out_transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done
@@ -206,27 +204,22 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in):
def __init__(self, device, metadata, acl_in, events_in):
super().__init__()
self.context = context
self.device = device
self.metadata = metadata
self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
}
self.events_in_transfer = None
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.closed = False
def start(self):
# Set up transfer objects for input
@@ -234,7 +227,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt(
self.events_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET,
)
self.events_in_transfer.submit()
@@ -243,22 +236,23 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk(
self.acl_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET,
)
self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
def on_packet_received(self, transfer):
@property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
packet_type = transfer.getUserData()
status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
@@ -267,18 +261,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()]
)
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None
)
return
else:
logger.warning(
color(f'!!! transfer not completed: status={status}', 'red')
color(f'!!! IN transfer not completed: status={status}', 'red')
)
# Re-submit the transfer so we can receive more data
transfer.submit()
self.loop.call_soon_threadsafe(self.on_transport_lost)
async def dequeue(self):
while not self.closed:
@@ -288,21 +282,6 @@ async def open_usb_transport(spec: str) -> Transport:
return
self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self):
self.closed = True
@@ -331,15 +310,14 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed'
)
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink)
self.context = context
self.device = device
self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access
device.claimInterface(interface)
@@ -352,6 +330,22 @@ async def open_usb_transport(spec: str) -> Transport:
source.start()
sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.source.close()
self.sink.close()
@@ -361,6 +355,9 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close()
self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker
load_libusb()
context = usb1.USBContext()
@@ -540,7 +537,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError:
logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in)
source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error:
+17 -1
View File
@@ -432,7 +432,7 @@ def wrap_async(function):
def deprecated(msg: str):
"""
Throw deprecation warning before execution
Throw deprecation warning before execution.
"""
def wrapper(function):
@@ -444,3 +444,19 @@ def deprecated(msg: str):
return inner
return wrapper
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
return inner
return wrapper
+53 -13
View File
@@ -1,19 +1,19 @@
ANDROID REMOTE HCI APP
======================
This application allows using an android phone's built-in Bluetooth controller with
This application allows using an android phone's built-in Bluetooth controller with
a Bumble host stack running outside the phone (typically a development laptop or desktop).
The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
description of the Android Bluetooth HCI HAL).
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
packets coming from the controller are forwarded to the TCP socket.
Building
--------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `RemoteHCI` top level directory.
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `extras/android/RemoteHCI` top level directory.
You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at:
@@ -25,9 +25,23 @@ If the build succeeds, you can find the app APKs (debug and release) at:
Running
-------
!!! note
In the following examples, it is assumed that shell commands are executed while in the
app's root directory, `extras/android/RemoteHCI`. If you are in a different directory,
adjust the relative paths accordingly.
### Preconditions
When the proxy starts (tapping the "Start" button in the app's main activity), it will try to
bind to the Bluetooth HAL. This requires disabling SELinux temporarily, and being the only HAL client.
When the proxy starts (tapping the "Start" button in the app's main activity, or running the proxy
from an `adb shell` command line), it will try to bind to the Bluetooth HAL.
This requires that there is no other HAL client, and requires certain privileges.
For running as a regular app, this requires disabling SELinux temporarily.
For running as a command-line executable, this just requires a root shell.
#### Root Shell
!!! tip "Restart `adb` as root"
```bash
$ adb root
```
#### Disabling SELinux
Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed
@@ -56,8 +70,8 @@ development phone).
This state will also reset to the normal SELinux enforcement when you reboot.
#### Stopping the bluetooth process
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Android bluetooth stack process.
!!! tip "Checking if the Bluetooth process is running"
@@ -79,7 +93,33 @@ Airplane Mode, then rebooting. The bluetooth process should, in theory, not rest
$ adb shell cmd bluetooth_manager disable
```
### Starting the app
### Running as a command line app
You push the built APK to a temporary location on the phone's filesystem, then launch the command
line executable with an `adb shell` command.
!!! tip "Pushing the executable"
```bash
$ adb push app/build/outputs/apk/release/app-release-unsigned.apk /data/local/tmp/remotehci.apk
```
Do this every time you rebuild. Alternatively, you can push the `debug` APK instead:
```bash
$ adb push app/build/outputs/apk/debug/app-debug.apk /data/local/tmp/remotehci.apk
```
!!! tip "Start the proxy from the command line"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface"
```
This will run the proxy, listening on the default TCP port.
If you want a different port, pass it as a command line parameter
!!! tip "Start the proxy from the command line with a specific TCP port"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface 12345"
```
### Running as a normal app
You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher
@@ -103,11 +143,11 @@ automatically start the proxy, and/or set the port number.
#### Selecting a TCP port
The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on
which the proxy is accepting connections. If the default value isn't suitable, you can
which the proxy is accepting connections. If the default value isn't suitable, you can
change it there (you can also use the special value 0 to let the OS assign a port number for you).
### Connecting to the proxy
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb`
TCP forward (which should be faster than over WiFi).
@@ -116,7 +156,7 @@ TCP forward (which should be faster than over WiFi).
```bash
$ adb forward tcp:<outside-port> tcp:<inside-port>
```
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
desktop machine, and <inside-port> is the TCP port selected in the app's user interface.
Those two ports may be the same, of course.
For example, with the default TCP port 9993:
@@ -125,7 +165,7 @@ TCP forward (which should be faster than over WiFi).
```
Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
``tcp-client:<host>:<port>`` syntax.
!!! example "Connecting a Bumble client"
+5
View File
@@ -0,0 +1,5 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"advertising_interval": 100
}
+107
View File
@@ -0,0 +1,107 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
Device,
Connection,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].cis_enabled = True
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 0),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 0),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
for cis_link in cis_links:
await cis_link.disconnect()
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+87
View File
@@ -0,0 +1,87 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import logging
import sys
import os
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_esco_connection.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_esco_connection.py classic1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
connections = await asyncio.gather(
devices[0].accept(devices[1].public_address),
devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
)
def on_sco(sco_link: ScoLink):
connections[0].abort_on('disconnection', sco_link.disconnect())
devices[0].once('sco_connection', on_sco)
await devices[0].send_command(
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
# type: ignore[call-args]
)
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+69
View File
@@ -0,0 +1,69 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
)
print('example: run_extended_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
else:
target = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
@@ -1,5 +1,5 @@
[versions]
agp = "8.3.0-alpha11"
agp = "8.2.0"
kotlin = "1.9.0"
core-ktx = "1.12.0"
junit = "4.13.2"
@@ -0,0 +1,57 @@
package com.github.google.bumble.remotehci
import java.io.IOException
class CommandLineInterface {
companion object {
fun printUsage() {
System.out.println("usage: <launch-command> [-h|--help] [<tcp-port>]")
}
@JvmStatic fun main(args: Array<String>) {
System.out.println("Starting proxy")
var tcpPort = DEFAULT_TCP_PORT
if (args.isNotEmpty()) {
if (args[0] == "-h" || args[0] == "--help") {
printUsage()
return
}
try {
tcpPort = args[0].toInt()
} catch (error: NumberFormatException) {
System.out.println("ERROR: invalid TCP port argument")
printUsage()
return
}
}
try {
val hciProxy = HciProxy(tcpPort, object : HciProxy.Listener {
override fun onHostConnectionState(connected: Boolean) {
}
override fun onHciPacketCountChange(
commandPacketsReceived: Int,
aclPacketsReceived: Int,
scoPacketsReceived: Int,
eventPacketsSent: Int,
aclPacketsSent: Int,
scoPacketsSent: Int
) {
}
override fun onMessage(message: String?) {
System.out.println(message)
}
})
hciProxy.run()
} catch (error: IOException) {
System.err.println("Exception while running HCI Server: $error")
} catch (error: HciProxy.HalException) {
System.err.println("HAL exception: ${error.message}")
}
}
}
}
@@ -1,5 +1,5 @@
[versions]
agp = "8.3.0-alpha11"
agp = "8.2.0"
kotlin = "1.8.10"
core-ktx = "1.9.0"
junit = "4.13.2"
+2 -2
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ url = https://github.com/google/bumble
[options]
python_requires = >=3.8
packages = bumble, bumble.transport, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
packages = bumble, bumble.transport, bumble.transport.grpc_protobuf, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
package_dir =
bumble = bumble
bumble.apps = apps
+74
View File
@@ -0,0 +1,74 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import struct
import logging
from bumble import device
from bumble.profiles import csip
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices()
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy
)
assert (
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED
)
assert await csis_client.set_member_rank.read_value() == struct.pack('B', 0)
# -----------------------------------------------------------------------------
async def run():
await test_csis()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+79
View File
@@ -20,6 +20,7 @@ import logging
import os
import struct
import pytest
from unittest.mock import Mock, ANY
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
@@ -763,6 +764,83 @@ async def test_subscribe_notify():
assert not c3._called_3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_unsubscribe():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
characteristic2 = Characteristic(
'3234C4F4-3F34-4616-8935-45A50EE05DEB',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic1, characteristic2],
)
server.add_services([service1])
mock1 = Mock()
characteristic1.on('subscription', mock1)
mock2 = Mock()
characteristic2.on('subscription', mock2)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
await c1.subscribe()
await async_barrier()
mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe()
await async_barrier()
mock2.assert_called_once_with(ANY, True, False)
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()
mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock()
await c2.unsubscribe()
await async_barrier()
mock2.assert_called_once_with(ANY, False, False)
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()
mock1.assert_not_called()
mock2.reset_mock()
await c2.unsubscribe()
await async_barrier()
mock2.assert_not_called()
mock1.reset_mock()
await c1.unsubscribe(force=True)
await async_barrier()
mock1.assert_called_once_with(ANY, False, False)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mtu_exchange():
@@ -886,6 +964,7 @@ async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_unsubscribe()
await test_characteristic_encoding()
await test_mtu_exchange()
+16
View File
@@ -24,6 +24,8 @@ from bumble.hci import (
HCI_RESET_COMMAND,
HCI_SUCCESS,
Address,
CodingFormat,
CodecID,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
@@ -442,6 +444,20 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Setup_ISO_Data_Path_Command():
command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000'))
assert command.connection_handle == 0x0060
assert command.data_path_direction == 0x00
assert command.data_path_id == 0x01
assert command.codec_id == CodingFormat(CodecID.TRANSPARENT)
assert command.controller_delay == 0
assert command.codec_configuration == b''
basic_check(command)
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
+3 -12
View File
@@ -21,7 +21,7 @@ import logging
import os
import pytest
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
@@ -38,7 +38,6 @@ from bumble.smp import (
OobLegacyContext,
)
from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
from bumble.keys import PairingKeys
@@ -519,16 +518,8 @@ async def test_self_smp_over_classic():
# Mock connection
# TODO: Implement Classic SSP and encryption in link relayer
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
two_devices.devices[0].on_link_key(
two_devices.devices[1].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[1].on_link_key(
two_devices.devices[0].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY)
two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY)
two_devices.connections[0].encryption = 1
two_devices.connections[1].encryption = 1
+68 -72
View File
@@ -16,6 +16,9 @@
# Imports
# -----------------------------------------------------------------------------
import pytest
from bumble import smp
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.pairing import OobData, OobSharedData, LeRole
from bumble.hci import Address
@@ -28,8 +31,8 @@ from bumble.core import AdvertisingData
# -----------------------------------------------------------------------------
def reversed_hex(hex_str):
return bytes(reversed(bytes.fromhex(hex_str)))
def reversed_hex(hex_str: str) -> bytes:
return bytes.fromhex(hex_str)[::-1]
# -----------------------------------------------------------------------------
@@ -129,112 +132,79 @@ def test_aes_cmac():
# -----------------------------------------------------------------------------
def test_f4():
u = bytes(
reversed(
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
u = reversed_hex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
v = bytes(
reversed(
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
v = reversed_hex(
'55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
z = bytes([0])
x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
z = b'\0'
value = f4(u, v, x, z)
assert bytes(reversed(value)) == bytes.fromhex(
'f2c916f1 07a9bd1c f1eda1be a974872d'
)
assert value == reversed_hex('f2c916f1 07a9bd1c f1eda1be a974872d')
# -----------------------------------------------------------------------------
def test_f5():
w = bytes(
reversed(
bytes.fromhex(
'ec0234a3 57c8ad05 341010a6 0a397d9b'
+ '99796b13 b4f866f1 868d34f3 73bfa698'
)
)
w = reversed_hex(
'ec0234a3 57c8ad05 341010a6 0a397d9b 99796b13 b4f866f1 868d34f3 73bfa698'
)
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce')))
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1')))
n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
a1 = reversed_hex('00561237 37bfce')
a2 = reversed_hex('00a71370 2dcfc1')
value = f5(w, n1, n2, a1, a2)
assert bytes(reversed(value[0])) == bytes.fromhex(
'2965f176 a1084a02 fd3f6a20 ce636e20'
)
assert bytes(reversed(value[1])) == bytes.fromhex(
'69867911 69d7cd23 980522b5 94750a38'
)
assert value[0] == reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
assert value[1] == reversed_hex('69867911 69d7cd23 980522b5 94750a38')
# -----------------------------------------------------------------------------
def test_f6():
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
mac_key = bytes(reversed(bytes.fromhex('2965f176 a1084a02 fd3f6a20 ce636e20')))
r = bytes(reversed(bytes.fromhex('12a3343b b453bb54 08da42d2 0c2d0fc8')))
io_cap = bytes(reversed(bytes.fromhex('010102')))
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce')))
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1')))
n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
mac_key = reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
r = reversed_hex('12a3343b b453bb54 08da42d2 0c2d0fc8')
io_cap = reversed_hex('010102')
a1 = reversed_hex('00561237 37bfce')
a2 = reversed_hex('00a71370 2dcfc1')
value = f6(mac_key, n1, n2, r, io_cap, a1, a2)
assert bytes(reversed(value)) == bytes.fromhex(
'e3c47398 9cd0e8c5 d26c0b09 da958f61'
)
assert value == reversed_hex('e3c47398 9cd0e8c5 d26c0b09 da958f61')
# -----------------------------------------------------------------------------
def test_g2():
u = bytes(
reversed(
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
u = reversed_hex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
v = bytes(
reversed(
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
v = reversed_hex(
'55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
y = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
y = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
value = g2(u, v, x, y)
assert value == 0x2F9ED5BA
# -----------------------------------------------------------------------------
def test_h6():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY_ID = bytes.fromhex('6c656272')
assert h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
assert h6(KEY, KEY_ID) == reversed_hex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
# -----------------------------------------------------------------------------
def test_h7():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
assert h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011')
assert h7(SALT, KEY) == reversed_hex('fb173597 c6a3c0ec d2998c2a 75a57011')
# -----------------------------------------------------------------------------
def test_ah():
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')))
prand = bytes(reversed(bytes.fromhex('708194')))
irk = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
prand = reversed_hex('708194')
value = ah(irk, prand)
expected = bytes(reversed(bytes.fromhex('0dfbaa')))
expected = reversed_hex('0dfbaa')
assert value == expected
@@ -243,7 +213,7 @@ def test_oob_data():
oob_data = OobData(
address=Address("F0:F1:F2:F3:F4:F5"),
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])),
shared_data=OobSharedData(c=b'12', r=b'34'),
)
oob_data_ad = oob_data.to_ad()
oob_data_bytes = bytes(oob_data_ad)
@@ -255,6 +225,32 @@ def test_oob_data():
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'bc1ca4ef 633fc1bd 0d8230af ee388fb0'),
(True, '287ad379 dca40253 0a39f1f4 3047b835'),
],
)
def test_ltk_to_link_key(ct2: bool, expected: str):
LTK = reversed_hex('368df9bc e3264b58 bd066c33 334fbf64')
assert smp.Session.derive_link_key(LTK, ct2) == reversed_hex(expected)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'a813fb72 f1a3dfa1 8a2c9a43 f10d0a30'),
(True, 'e85e09eb 5eccb3e2 69418a13 3211bc79'),
],
)
def test_link_key_to_ltk(ct2: bool, expected: str):
LINK_KEY = reversed_hex('05040302 01000908 07060504 03020100')
assert smp.Session.derive_ltk(LINK_KEY, ct2) == reversed_hex(expected)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_ecc()
+3
View File
@@ -71,3 +71,6 @@ class TwoDevices:
# Check the post conditions
assert self.connections[0] is not None
assert self.connections[1] is not None
def __getitem__(self, index: int) -> Device:
return self.devices[index]