Compare commits

...

37 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
e7e9f9509a update rootcanal version 2024-02-02 20:33:19 -08:00
Gilles Boccon-Gibod
8d2f37aa7a inclusive language 2024-01-28 19:09:39 -08:00
Gilles Boccon-Gibod
b7b70ebcbb address PR comments 2024-01-28 19:09:37 -08:00
Gilles Boccon-Gibod
8ba91f4986 fix assert 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
79a5e953bc comply with limits for certain advertising event types 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
20de5ea250 format 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
bad9ce272c add doc 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
d3273ffa8c format (+3 squashed commits)
Squashed commits:
[60e610f] wip
[eeab73d] wip
[3cdd5b8] basic first pass
2024-01-28 19:02:30 -08:00
zxzxwu
071fc2723a Merge pull request #376 from zxzxwu/host
Manage lifecycle of CIS and SCO links in host
2024-01-28 22:09:08 +08:00
zxzxwu
ef4ea86f58 Merge pull request #381 from zxzxwu/offload
Support non-directed address generation offload
2024-01-28 22:08:32 +08:00
Gilles Boccon-Gibod
dfdaa149d0 Merge pull request #337 from google/gbg/avrcp
Add AVRCP support
2024-01-28 01:27:52 -08:00
Gilles Boccon-Gibod
986343a807 support multiple type checkers for pandora 2024-01-28 01:21:50 -08:00
Gilles Boccon-Gibod
5211d7ba96 revert to older pytest_asyncio 2024-01-28 01:10:31 -08:00
Gilles Boccon-Gibod
a167342778 deal with SupportsBytes for python <= 3.10 2024-01-28 01:04:13 -08:00
Gilles Boccon-Gibod
1efb8cdbee use matrixed python version 2024-01-28 00:34:42 -08:00
Gilles Boccon-Gibod
80d83e6a70 upgrade to mypy 1.8.0 2024-01-28 00:26:50 -08:00
Gilles Boccon-Gibod
31ec1c41ce cleanup 2024-01-28 00:07:31 -08:00
Gilles Boccon-Gibod
aba1ac0cea use a dict instead of a series of ifs (+6 squashed commits)
Squashed commits:
[90f2024] fix import order
[0edd321] add a few docstrings
[77a0ac0] wip
[adcf159] wip
[96cbd67] wip
[d8bfbab] wip (+1 squashed commit)
Squashed commits:
[43b4d66] wip (+2 squashed commits)
Squashed commits:
[3dafaa8] wip
[5844026] wip (+1 squashed commit)
Squashed commits:
[4cbb35a] wip (+1 squashed commit)
Squashed commits:
[4d2b6d3] wip (+4 squashed commits)
Squashed commits:
[f2da510] wip
[318c119] wip
[923b4eb] wip
[9d46365] wip

use a dict instead of a series of ifs (+6 squashed commits)
Squashed commits:
[90f2024] fix import order
[0edd321] add a few docstrings
[77a0ac0] wip
[adcf159] wip
[96cbd67] wip
[d8bfbab] wip
2024-01-27 16:26:17 -08:00
Josh Wu
c40824e51c Support non-directed address generation offload 2024-01-26 16:02:40 +08:00
Gilles Boccon-Gibod
2920f05dae Merge pull request #411 from AlanRosenthal/main
Add bumble-controller-loopback console_scripts
2024-01-24 13:20:19 -08:00
Alan Rosenthal
bc911d6da0 Add bumble-controller-loopback console_scripts 2024-01-24 14:07:35 -05:00
Gilles Boccon-Gibod
4f87f587e4 Merge pull request #409 from google/gbg/root-canal-update
update to rootcanal 1.4
2024-01-22 15:07:20 -08:00
Gilles Boccon-Gibod
3e38ab3638 update to rootcanal 1.4 2024-01-22 12:19:12 -08:00
Gilles Boccon-Gibod
21bb911fea Merge pull request #408 from suneeshs/btbench-update
Update the Bumble BT Bench AndroidManifest.xml
2024-01-22 12:15:35 -08:00
Suneesh Sasikumar
744dfa33a2 Update the Bumble BT Bench AndroidManifest.xml 2024-01-22 13:46:55 -05:00
zxzxwu
ec5f8535a8 Merge pull request #405 from zxzxwu/adv
Make Advertisement dataclass
2024-01-20 11:05:41 +08:00
Gilles Boccon-Gibod
5a83734a00 Merge pull request #388 from google/gbg/scan-with-irk
allow passing IRKs as arguments to scan.py
2024-01-19 11:37:02 -08:00
Josh Wu
b4ae8af3a7 Typing Advertisement 2024-01-19 15:16:24 +08:00
Josh Wu
da60386385 Manage lifecycle of CIS and SCO links in host 2024-01-18 11:56:38 +08:00
zxzxwu
45c4c4f4c5 Merge pull request #404 from zxzxwu/cis
Fix HCI_LE_Set_Host_Feature_Command
2024-01-18 10:56:05 +08:00
zxzxwu
9187c75d68 Merge pull request #397 from zxzxwu/controller
Controller: CIS implementation
2024-01-18 10:55:37 +08:00
zxzxwu
abeec22546 Merge pull request #402 from zxzxwu/key
Save Link Key in CTKD over BR/EDR
2024-01-18 10:55:14 +08:00
Josh Wu
a6bab755cf Fix HCI_LE_Set_Host_Feature_Command 2024-01-17 22:15:15 +08:00
Josh Wu
acd9d994c3 Save link_key in CTKD over BR/EDR
Since keystore.update() overwrites all existing keys, the existing link
key will be wiped out. To avoid this, SMP also need to keep the key.
2024-01-17 19:30:02 +08:00
Gilles Boccon-Gibod
37afda3ed3 Merge pull request #399 from google/gbg/hotfix-002
fix uninitialized variable
2024-01-16 17:29:48 -08:00
Josh Wu
fa4df6e3a2 Controller: CIS implementation 2024-01-11 01:16:42 +08:00
Gilles Boccon-Gibod
d43281c57e allow passing IRKs as arguments 2023-12-28 14:35:23 -08:00
40 changed files with 5668 additions and 950 deletions

View File

@@ -29,7 +29,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.10'
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip

View File

@@ -12,7 +12,9 @@
"ASHA",
"asyncio",
"ATRAC",
"avctp",
"avdtp",
"avrcp",
"bitpool",
"bitstruct",
"BSCP",

View File

@@ -26,7 +26,7 @@ from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
from bumble.device import Advertisement
from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
from bumble.hci import Address, HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# -----------------------------------------------------------------------------
@@ -66,10 +66,15 @@ class AdvertisementPrinter:
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
address.address_type
]
if address.is_public:
type_color = 'cyan'
if address.address_type in (
Address.RANDOM_IDENTITY_ADDRESS,
Address.PUBLIC_IDENTITY_ADDRESS,
):
type_color = 'yellow'
else:
if address.is_static:
if address.is_public:
type_color = 'cyan'
elif address.is_static:
type_color = 'green'
address_qualifier = '(static)'
elif address.is_resolvable:
@@ -116,6 +121,7 @@ async def scan(
phy,
filter_duplicates,
raw,
irks,
keystore_file,
device_config,
transport,
@@ -140,9 +146,21 @@ async def scan(
if device.keystore:
resolving_keys = await device.keystore.get_resolving_keys()
resolver = AddressResolver(resolving_keys)
else:
resolver = None
resolving_keys = []
for irk_and_address in irks:
if ':' not in irk_and_address:
raise ValueError('invalid IRK:ADDRESS value')
irk_hex, address_str = irk_and_address.split(':', 1)
resolving_keys.append(
(
bytes.fromhex(irk_hex),
Address(address_str, Address.RANDOM_DEVICE_ADDRESS),
)
)
resolver = AddressResolver(resolving_keys) if resolving_keys else None
printer = AdvertisementPrinter(min_rssi, resolver)
if raw:
@@ -187,8 +205,24 @@ async def scan(
default=False,
help='Listen for raw advertising reports instead of processed ones',
)
@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device')
@click.option(
'--irk',
metavar='<IRK_HEX>:<ADDRESS>',
help=(
'Use this IRK for resolving private addresses ' '(may be used more than once)'
),
multiple=True,
)
@click.option(
'--keystore-file',
metavar='FILE_PATH',
help='Keystore file to use when resolving addresses',
)
@click.option(
'--device-config',
metavar='FILE_PATH',
help='Device config file for the scanning device',
)
@click.argument('transport')
def main(
min_rssi,
@@ -198,6 +232,7 @@ def main(
phy,
filter_duplicates,
raw,
irk,
keystore_file,
device_config,
transport,
@@ -212,6 +247,7 @@ def main(
phy,
filter_duplicates,
raw,
irk,
keystore_file,
device_config,
transport,

View File

@@ -184,8 +184,12 @@ def make_audio_source_service_sdp_records(service_record_handle, version=(1, 3))
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
]
)
]
),
),
@@ -234,8 +238,12 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)):
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
DataElement.sequence(
[
DataElement.uuid(BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE),
DataElement.unsigned_integer_16(version_int),
]
)
]
),
),

520
bumble/avc.py Normal file
View File

@@ -0,0 +1,520 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import Dict, Type, Union, Tuple
from bumble.utils import OpenIntEnum
# -----------------------------------------------------------------------------
class Frame:
class SubunitType(enum.IntEnum):
# AV/C Digital Interface Command Set General Specification Version 4.1
# Table 7.4
MONITOR = 0x00
AUDIO = 0x01
PRINTER = 0x02
DISC = 0x03
TAPE_RECORDER_OR_PLAYER = 0x04
TUNER = 0x05
CA = 0x06
CAMERA = 0x07
PANEL = 0x09
BULLETIN_BOARD = 0x0A
VENDOR_UNIQUE = 0x1C
EXTENDED = 0x1E
UNIT = 0x1F
class OperationCode(OpenIntEnum):
# 0x00 - 0x0F: Unit and subunit commands
VENDOR_DEPENDENT = 0x00
RESERVE = 0x01
PLUG_INFO = 0x02
# 0x10 - 0x3F: Unit commands
DIGITAL_OUTPUT = 0x10
DIGITAL_INPUT = 0x11
CHANNEL_USAGE = 0x12
OUTPUT_PLUG_SIGNAL_FORMAT = 0x18
INPUT_PLUG_SIGNAL_FORMAT = 0x19
GENERAL_BUS_SETUP = 0x1F
CONNECT_AV = 0x20
DISCONNECT_AV = 0x21
CONNECTIONS = 0x22
CONNECT = 0x24
DISCONNECT = 0x25
UNIT_INFO = 0x30
SUBUNIT_INFO = 0x31
# 0x40 - 0x7F: Subunit commands
PASS_THROUGH = 0x7C
GUI_UPDATE = 0x7D
PUSH_GUI_DATA = 0x7E
USER_ACTION = 0x7F
# 0xA0 - 0xBF: Unit and subunit commands
VERSION = 0xB0
POWER = 0xB2
subunit_type: SubunitType
subunit_id: int
opcode: OperationCode
operands: bytes
@staticmethod
def subclass(subclass):
# Infer the opcode from the class name
if subclass.__name__.endswith("CommandFrame"):
short_name = subclass.__name__.replace("CommandFrame", "")
category_class = CommandFrame
elif subclass.__name__.endswith("ResponseFrame"):
short_name = subclass.__name__.replace("ResponseFrame", "")
category_class = ResponseFrame
else:
raise ValueError(f"invalid subclass name {subclass.__name__}")
uppercase_indexes = [
i for i in range(len(short_name)) if short_name[i].isupper()
]
uppercase_indexes.append(len(short_name))
words = [
short_name[uppercase_indexes[i] : uppercase_indexes[i + 1]].upper()
for i in range(len(uppercase_indexes) - 1)
]
opcode_name = "_".join(words)
opcode = Frame.OperationCode[opcode_name]
category_class.subclasses[opcode] = subclass
return subclass
@staticmethod
def from_bytes(data: bytes) -> Frame:
if data[0] >> 4 != 0:
raise ValueError("first 4 bits must be 0s")
ctype_or_response = data[0] & 0xF
subunit_type = Frame.SubunitType(data[1] >> 3)
subunit_id = data[1] & 7
if subunit_type == Frame.SubunitType.EXTENDED:
# Not supported
raise NotImplementedError("extended subunit types not supported")
if subunit_id < 5:
opcode_offset = 2
elif subunit_id == 5:
# Extended to the next byte
extension = data[2]
if extension == 0:
raise ValueError("extended subunit ID value reserved")
if extension == 0xFF:
subunit_id = 5 + 254 + data[3]
opcode_offset = 4
else:
subunit_id = 5 + extension
opcode_offset = 3
elif subunit_id == 6:
raise ValueError("reserved subunit ID")
opcode = Frame.OperationCode(data[opcode_offset])
operands = data[opcode_offset + 1 :]
# Look for a registered subclass
if ctype_or_response < 8:
# Command
ctype = CommandFrame.CommandType(ctype_or_response)
if c_subclass := CommandFrame.subclasses.get(opcode):
return c_subclass(
ctype,
subunit_type,
subunit_id,
*c_subclass.parse_operands(operands),
)
return CommandFrame(ctype, subunit_type, subunit_id, opcode, operands)
else:
# Response
response = ResponseFrame.ResponseCode(ctype_or_response)
if r_subclass := ResponseFrame.subclasses.get(opcode):
return r_subclass(
response,
subunit_type,
subunit_id,
*r_subclass.parse_operands(operands),
)
return ResponseFrame(response, subunit_type, subunit_id, opcode, operands)
def to_bytes(
self,
ctype_or_response: Union[CommandFrame.CommandType, ResponseFrame.ResponseCode],
) -> bytes:
# TODO: support extended subunit types and ids.
return (
bytes(
[
ctype_or_response,
self.subunit_type << 3 | self.subunit_id,
self.opcode,
]
)
+ self.operands
)
def to_string(self, extra: str) -> str:
return (
f"{self.__class__.__name__}({extra}"
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"opcode={self.opcode.name}, "
f"operands={self.operands.hex()})"
)
def __init__(
self,
subunit_type: SubunitType,
subunit_id: int,
opcode: OperationCode,
operands: bytes,
) -> None:
self.subunit_type = subunit_type
self.subunit_id = subunit_id
self.opcode = opcode
self.operands = operands
# -----------------------------------------------------------------------------
class CommandFrame(Frame):
class CommandType(OpenIntEnum):
# AV/C Digital Interface Command Set General Specification Version 4.1
# Table 7.1
CONTROL = 0x00
STATUS = 0x01
SPECIFIC_INQUIRY = 0x02
NOTIFY = 0x03
GENERAL_INQUIRY = 0x04
subclasses: Dict[Frame.OperationCode, Type[CommandFrame]] = {}
ctype: CommandType
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
raise NotImplementedError
def __init__(
self,
ctype: CommandType,
subunit_type: Frame.SubunitType,
subunit_id: int,
opcode: Frame.OperationCode,
operands: bytes,
) -> None:
super().__init__(subunit_type, subunit_id, opcode, operands)
self.ctype = ctype
def __bytes__(self):
return self.to_bytes(self.ctype)
def __str__(self):
return self.to_string(f"ctype={self.ctype.name}, ")
# -----------------------------------------------------------------------------
class ResponseFrame(Frame):
class ResponseCode(OpenIntEnum):
# AV/C Digital Interface Command Set General Specification Version 4.1
# Table 7.2
NOT_IMPLEMENTED = 0x08
ACCEPTED = 0x09
REJECTED = 0x0A
IN_TRANSITION = 0x0B
IMPLEMENTED_OR_STABLE = 0x0C
CHANGED = 0x0D
INTERIM = 0x0F
subclasses: Dict[Frame.OperationCode, Type[ResponseFrame]] = {}
response: ResponseCode
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
raise NotImplementedError
def __init__(
self,
response: ResponseCode,
subunit_type: Frame.SubunitType,
subunit_id: int,
opcode: Frame.OperationCode,
operands: bytes,
) -> None:
super().__init__(subunit_type, subunit_id, opcode, operands)
self.response = response
def __bytes__(self):
return self.to_bytes(self.response)
def __str__(self):
return self.to_string(f"response={self.response.name}, ")
# -----------------------------------------------------------------------------
class VendorDependentFrame:
company_id: int
vendor_dependent_data: bytes
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
return (
struct.unpack(">I", b"\x00" + operands[:3])[0],
operands[3:],
)
def make_operands(self) -> bytes:
return struct.pack(">I", self.company_id)[1:] + self.vendor_dependent_data
def __init__(self, company_id: int, vendor_dependent_data: bytes):
self.company_id = company_id
self.vendor_dependent_data = vendor_dependent_data
# -----------------------------------------------------------------------------
@Frame.subclass
class VendorDependentCommandFrame(VendorDependentFrame, CommandFrame):
def __init__(
self,
ctype: CommandFrame.CommandType,
subunit_type: Frame.SubunitType,
subunit_id: int,
company_id: int,
vendor_dependent_data: bytes,
) -> None:
VendorDependentFrame.__init__(self, company_id, vendor_dependent_data)
CommandFrame.__init__(
self,
ctype,
subunit_type,
subunit_id,
Frame.OperationCode.VENDOR_DEPENDENT,
self.make_operands(),
)
def __str__(self):
return (
f"VendorDependentCommandFrame(ctype={self.ctype.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"company_id=0x{self.company_id:06X}, "
f"vendor_dependent_data={self.vendor_dependent_data.hex()})"
)
# -----------------------------------------------------------------------------
@Frame.subclass
class VendorDependentResponseFrame(VendorDependentFrame, ResponseFrame):
def __init__(
self,
response: ResponseFrame.ResponseCode,
subunit_type: Frame.SubunitType,
subunit_id: int,
company_id: int,
vendor_dependent_data: bytes,
) -> None:
VendorDependentFrame.__init__(self, company_id, vendor_dependent_data)
ResponseFrame.__init__(
self,
response,
subunit_type,
subunit_id,
Frame.OperationCode.VENDOR_DEPENDENT,
self.make_operands(),
)
def __str__(self):
return (
f"VendorDependentResponseFrame(response={self.response.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"company_id=0x{self.company_id:06X}, "
f"vendor_dependent_data={self.vendor_dependent_data.hex()})"
)
# -----------------------------------------------------------------------------
class PassThroughFrame:
"""
See AV/C Panel Subunit Specification 1.1 - 9.4 PASS THROUGH control command
"""
class StateFlag(enum.IntEnum):
PRESSED = 0
RELEASED = 1
class OperationId(OpenIntEnum):
SELECT = 0x00
UP = 0x01
DOWN = 0x01
LEFT = 0x03
RIGHT = 0x04
RIGHT_UP = 0x05
RIGHT_DOWN = 0x06
LEFT_UP = 0x07
LEFT_DOWN = 0x08
ROOT_MENU = 0x09
SETUP_MENU = 0x0A
CONTENTS_MENU = 0x0B
FAVORITE_MENU = 0x0C
EXIT = 0x0D
NUMBER_0 = 0x20
NUMBER_1 = 0x21
NUMBER_2 = 0x22
NUMBER_3 = 0x23
NUMBER_4 = 0x24
NUMBER_5 = 0x25
NUMBER_6 = 0x26
NUMBER_7 = 0x27
NUMBER_8 = 0x28
NUMBER_9 = 0x29
DOT = 0x2A
ENTER = 0x2B
CLEAR = 0x2C
CHANNEL_UP = 0x30
CHANNEL_DOWN = 0x31
PREVIOUS_CHANNEL = 0x32
SOUND_SELECT = 0x33
INPUT_SELECT = 0x34
DISPLAY_INFORMATION = 0x35
HELP = 0x36
PAGE_UP = 0x37
PAGE_DOWN = 0x38
POWER = 0x40
VOLUME_UP = 0x41
VOLUME_DOWN = 0x42
MUTE = 0x43
PLAY = 0x44
STOP = 0x45
PAUSE = 0x46
RECORD = 0x47
REWIND = 0x48
FAST_FORWARD = 0x49
EJECT = 0x4A
FORWARD = 0x4B
BACKWARD = 0x4C
ANGLE = 0x50
SUBPICTURE = 0x51
F1 = 0x71
F2 = 0x72
F3 = 0x73
F4 = 0x74
F5 = 0x75
VENDOR_UNIQUE = 0x7E
state_flag: StateFlag
operation_id: OperationId
operation_data: bytes
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
return (
PassThroughFrame.StateFlag(operands[0] >> 7),
PassThroughFrame.OperationId(operands[0] & 0x7F),
operands[1 : 1 + operands[1]],
)
def make_operands(self):
return (
bytes([self.state_flag << 7 | self.operation_id, len(self.operation_data)])
+ self.operation_data
)
def __init__(
self,
state_flag: StateFlag,
operation_id: OperationId,
operation_data: bytes,
) -> None:
if len(operation_data) > 255:
raise ValueError("operation data must be <= 255 bytes")
self.state_flag = state_flag
self.operation_id = operation_id
self.operation_data = operation_data
# -----------------------------------------------------------------------------
@Frame.subclass
class PassThroughCommandFrame(PassThroughFrame, CommandFrame):
def __init__(
self,
ctype: CommandFrame.CommandType,
subunit_type: Frame.SubunitType,
subunit_id: int,
state_flag: PassThroughFrame.StateFlag,
operation_id: PassThroughFrame.OperationId,
operation_data: bytes,
) -> None:
PassThroughFrame.__init__(self, state_flag, operation_id, operation_data)
CommandFrame.__init__(
self,
ctype,
subunit_type,
subunit_id,
Frame.OperationCode.PASS_THROUGH,
self.make_operands(),
)
def __str__(self):
return (
f"PassThroughCommandFrame(ctype={self.ctype.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"state_flag={self.state_flag.name}, "
f"operation_id={self.operation_id.name}, "
f"operation_data={self.operation_data.hex()})"
)
# -----------------------------------------------------------------------------
@Frame.subclass
class PassThroughResponseFrame(PassThroughFrame, ResponseFrame):
def __init__(
self,
response: ResponseFrame.ResponseCode,
subunit_type: Frame.SubunitType,
subunit_id: int,
state_flag: PassThroughFrame.StateFlag,
operation_id: PassThroughFrame.OperationId,
operation_data: bytes,
) -> None:
PassThroughFrame.__init__(self, state_flag, operation_id, operation_data)
ResponseFrame.__init__(
self,
response,
subunit_type,
subunit_id,
Frame.OperationCode.PASS_THROUGH,
self.make_operands(),
)
def __str__(self):
return (
f"PassThroughResponseFrame(response={self.response.name}, "
f"subunit_type={self.subunit_type.name}, "
f"subunit_id=0x{self.subunit_id:02X}, "
f"state_flag={self.state_flag.name}, "
f"operation_id={self.operation_id.name}, "
f"operation_data={self.operation_data.hex()})"
)

291
bumble/avctp.py Normal file
View File

@@ -0,0 +1,291 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import logging
import struct
from typing import Callable, cast, Dict, Optional
from bumble.colors import color
from bumble import avc
from bumble import l2cap
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
AVCTP_PSM = 0x0017
AVCTP_BROWSING_PSM = 0x001B
# -----------------------------------------------------------------------------
class MessageAssembler:
Callback = Callable[[int, bool, bool, int, bytes], None]
transaction_label: int
pid: int
c_r: int
ipid: int
payload: bytes
number_of_packets: int
packets_received: int
def __init__(self, callback: Callback) -> None:
self.callback = callback
self.reset()
def reset(self) -> None:
self.packets_received = 0
self.transaction_label = -1
self.pid = -1
self.c_r = -1
self.ipid = -1
self.payload = b''
self.number_of_packets = 0
self.packet_count = 0
def on_pdu(self, pdu: bytes) -> None:
self.packets_received += 1
transaction_label = pdu[0] >> 4
packet_type = Protocol.PacketType((pdu[0] >> 2) & 3)
c_r = (pdu[0] >> 1) & 1
ipid = pdu[0] & 1
if c_r == 0 and ipid != 0:
logger.warning("invalid IPID in command frame")
self.reset()
return
pid_offset = 1
if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.START):
if self.transaction_label >= 0:
# We are already in a transaction
logger.warning("received START or SINGLE fragment while in transaction")
self.reset()
self.packets_received = 1
if packet_type == Protocol.PacketType.START:
self.number_of_packets = pdu[1]
pid_offset = 2
pid = struct.unpack_from(">H", pdu, pid_offset)[0]
self.payload += pdu[pid_offset + 2 :]
if packet_type in (Protocol.PacketType.CONTINUE, Protocol.PacketType.END):
if transaction_label != self.transaction_label:
logger.warning("transaction label does not match")
self.reset()
return
if pid != self.pid:
logger.warning("PID does not match")
self.reset()
return
if c_r != self.c_r:
logger.warning("C/R does not match")
self.reset()
return
if self.packets_received > self.number_of_packets:
logger.warning("too many fragments in transaction")
self.reset()
return
if packet_type == Protocol.PacketType.END:
if self.packets_received != self.number_of_packets:
logger.warning("premature END")
self.reset()
return
else:
self.transaction_label = transaction_label
self.c_r = c_r
self.ipid = ipid
self.pid = pid
if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.END):
self.on_message_complete()
def on_message_complete(self):
try:
self.callback(
self.transaction_label,
self.c_r == 0,
self.ipid != 0,
self.pid,
self.payload,
)
except Exception as error:
logger.exception(color(f"!!! exception in callback: {error}", "red"))
self.reset()
# -----------------------------------------------------------------------------
class Protocol:
CommandHandler = Callable[[int, avc.CommandFrame], None]
command_handlers: Dict[int, CommandHandler] # Command handlers, by PID
ResponseHandler = Callable[[int, Optional[avc.ResponseFrame]], None]
response_handlers: Dict[int, ResponseHandler] # Response handlers, by PID
next_transaction_label: int
message_assembler: MessageAssembler
class PacketType(IntEnum):
SINGLE = 0b00
START = 0b01
CONTINUE = 0b10
END = 0b11
def __init__(self, l2cap_channel: l2cap.ClassicChannel) -> None:
self.command_handlers = {}
self.response_handlers = {}
self.l2cap_channel = l2cap_channel
self.message_assembler = MessageAssembler(self.on_message)
# Register to receive PDUs from the channel
l2cap_channel.sink = self.on_pdu
l2cap_channel.on("open", self.on_l2cap_channel_open)
l2cap_channel.on("close", self.on_l2cap_channel_close)
def on_l2cap_channel_open(self):
logger.debug(color("<<< AVCTP channel open", "magenta"))
def on_l2cap_channel_close(self):
logger.debug(color("<<< AVCTP channel closed", "magenta"))
def on_pdu(self, pdu: bytes) -> None:
self.message_assembler.on_pdu(pdu)
def on_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
) -> None:
logger.debug(
f"<<< AVCTP Message: pid={pid}, "
f"transaction_label={transaction_label}, "
f"is_command={is_command}, "
f"ipid={ipid}, "
f"payload={payload.hex()}"
)
# Check for invalid PID responses.
if ipid:
logger.debug(f"received IPID for PID={pid}")
# Find the appropriate handler.
if is_command:
if pid not in self.command_handlers:
logger.warning(f"no command handler for PID {pid}")
self.send_ipid(transaction_label, pid)
return
command_frame = cast(avc.CommandFrame, avc.Frame.from_bytes(payload))
self.command_handlers[pid](transaction_label, command_frame)
else:
if pid not in self.response_handlers:
logger.warning(f"no response handler for PID {pid}")
return
# By convention, for an ipid, send a None payload to the response handler.
if ipid:
response_frame = None
else:
response_frame = cast(avc.ResponseFrame, avc.Frame.from_bytes(payload))
self.response_handlers[pid](transaction_label, response_frame)
def send_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
):
# TODO: fragment large messages
packet_type = Protocol.PacketType.SINGLE
pdu = (
struct.pack(
">BH",
transaction_label << 4
| packet_type << 2
| (0 if is_command else 1) << 1
| (1 if ipid else 0),
pid,
)
+ payload
)
self.l2cap_channel.send_pdu(pdu)
def send_command(self, transaction_label: int, pid: int, payload: bytes) -> None:
logger.debug(
">>> AVCTP command: "
f"transaction_label={transaction_label}, "
f"pid={pid}, "
f"payload={payload.hex()}"
)
self.send_message(transaction_label, True, False, pid, payload)
def send_response(self, transaction_label: int, pid: int, payload: bytes):
logger.debug(
">>> AVCTP response: "
f"transaction_label={transaction_label}, "
f"pid={pid}, "
f"payload={payload.hex()}"
)
self.send_message(transaction_label, False, False, pid, payload)
def send_ipid(self, transaction_label: int, pid: int) -> None:
logger.debug(
">>> AVCTP ipid: " f"transaction_label={transaction_label}, " f"pid={pid}"
)
self.send_message(transaction_label, False, True, pid, b'')
def register_command_handler(
self, pid: int, handler: Protocol.CommandHandler
) -> None:
self.command_handlers[pid] = handler
def unregister_command_handler(
self, pid: int, handler: Protocol.CommandHandler
) -> None:
if pid not in self.command_handlers or self.command_handlers[pid] != handler:
raise ValueError("command handler not registered")
del self.command_handlers[pid]
def register_response_handler(
self, pid: int, handler: Protocol.ResponseHandler
) -> None:
self.response_handlers[pid] = handler
def unregister_response_handler(
self, pid: int, handler: Protocol.ResponseHandler
) -> None:
if pid not in self.response_handlers or self.response_handlers[pid] != handler:
raise ValueError("response handler not registered")
del self.response_handlers[pid]

View File

@@ -241,7 +241,10 @@ async def find_avdtp_service_with_sdp_client(
)
if profile_descriptor_list:
for profile_descriptor in profile_descriptor_list.value:
if len(profile_descriptor.value) >= 2:
if (
profile_descriptor.type == sdp.DataElement.SEQUENCE
and len(profile_descriptor.value) >= 2
):
avdtp_version_major = profile_descriptor.value[1].value >> 8
avdtp_version_minor = profile_descriptor.value[1].value & 0xFF
return (avdtp_version_major, avdtp_version_minor)
@@ -511,7 +514,8 @@ class MessageAssembler:
try:
self.callback(self.transaction_label, message)
except Exception as error:
logger.warning(color(f'!!! exception in callback: {error}'))
logger.exception(color(f'!!! exception in callback: {error}', 'red'))
self.reset()

1916
bumble/avrcp.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -57,6 +57,8 @@ from bumble.hci import (
HCI_Encryption_Change_Event,
HCI_Synchronous_Connection_Complete_Event,
HCI_LE_Advertising_Report_Event,
HCI_LE_CIS_Established_Event,
HCI_LE_CIS_Request_Event,
HCI_LE_Connection_Complete_Event,
HCI_LE_Read_Remote_Features_Complete_Event,
HCI_Number_Of_Completed_Packets_Event,
@@ -82,6 +84,15 @@ class DataObject:
pass
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
handle: int
cis_id: int
cig_id: int
acl_connection: Optional[Connection] = None
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Connection:
@@ -132,6 +143,8 @@ class Controller:
self.classic_connections: Dict[
Address, Connection
] = {} # Connections in BR/EDR
self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.hci_revision = 0
@@ -310,7 +323,7 @@ class Controller:
############################################################
# Link connections
############################################################
def allocate_connection_handle(self):
def allocate_connection_handle(self) -> int:
handle = 0
max_handle = 0
for connection in itertools.chain(
@@ -322,6 +335,13 @@ class Controller:
if connection.handle == handle:
# Already used, continue searching after the current max
handle = max_handle + 1
for cis_handle in itertools.chain(
self.central_cis_links.keys(), self.peripheral_cis_links.keys()
):
max_handle = max(max_handle, cis_handle)
if cis_handle == handle:
# Already used, continue searching after the current max
handle = max_handle + 1
return handle
def find_le_connection_by_address(self, address):
@@ -549,6 +569,104 @@ class Controller:
)
self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
def on_link_cis_request(
self, central_address: Address, cig_id: int, cis_id: int
) -> None:
'''
Called when an incoming CIS request occurs from a central on the link
'''
connection = self.peripheral_connections.get(central_address)
assert connection
pending_cis_link = CisLink(
handle=self.allocate_connection_handle(),
cis_id=cis_id,
cig_id=cig_id,
acl_connection=connection,
)
self.peripheral_cis_links[pending_cis_link.handle] = pending_cis_link
self.send_hci_packet(
HCI_LE_CIS_Request_Event(
acl_connection_handle=connection.handle,
cis_connection_handle=pending_cis_link.handle,
cig_id=cig_id,
cis_id=cis_id,
)
)
def on_link_cis_established(self, cig_id: int, cis_id: int) -> None:
'''
Called when an incoming CIS established.
'''
cis_link = next(
cis_link
for cis_link in itertools.chain(
self.central_cis_links.values(), self.peripheral_cis_links.values()
)
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
)
self.send_hci_packet(
HCI_LE_CIS_Established_Event(
status=HCI_SUCCESS,
connection_handle=cis_link.handle,
# CIS parameters are ignored.
cig_sync_delay=0,
cis_sync_delay=0,
transport_latency_c_to_p=0,
transport_latency_p_to_c=0,
phy_c_to_p=0,
phy_p_to_c=0,
nse=0,
bn_c_to_p=0,
bn_p_to_c=0,
ft_c_to_p=0,
ft_p_to_c=0,
max_pdu_c_to_p=0,
max_pdu_p_to_c=0,
iso_interval=0,
)
)
def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None:
'''
Called when a CIS disconnected.
'''
if cis_link := next(
(
cis_link
for cis_link in self.peripheral_cis_links.values()
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
),
None,
):
# Remove peripheral CIS on disconnection.
self.peripheral_cis_links.pop(cis_link.handle)
elif cis_link := next(
(
cis_link
for cis_link in self.central_cis_links.values()
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
),
None,
):
# Keep central CIS on disconnection. They should be removed by HCI_LE_Remove_CIG_Command.
cis_link.acl_connection = None
else:
return
self.send_hci_packet(
HCI_Disconnection_Complete_Event(
status=HCI_SUCCESS,
connection_handle=cis_link.handle,
reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
)
)
############################################################
# Classic link connections
############################################################
@@ -769,6 +887,17 @@ class Controller:
else:
# Remove the connection
del self.classic_connections[connection.peer_address]
elif cis_link := (
self.central_cis_links.get(handle) or self.peripheral_cis_links.get(handle)
):
if self.link:
self.link.disconnect_cis(
initiator_controller=self,
peer_address=cis_link.acl_connection.peer_address,
cig_id=cis_link.cig_id,
cis_id=cis_link.cis_id,
)
# Spec requires handle to be kept after disconnection.
def on_hci_accept_connection_request_command(self, command):
'''
@@ -1399,6 +1528,107 @@ class Controller:
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
def on_hci_le_set_cig_parameters_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command
'''
# Remove old CIG implicitly.
for handle, cis_link in self.central_cis_links.items():
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(handle)
handles = []
for cis_id in command.cis_id:
handle = self.allocate_connection_handle()
handles.append(handle)
self.central_cis_links[handle] = CisLink(
cis_id=cis_id,
cig_id=command.cig_id,
handle=handle,
)
return struct.pack(
'<BBB', HCI_SUCCESS, command.cig_id, len(handles)
) + b''.join([struct.pack('<H', handle) for handle in handles])
def on_hci_le_create_cis_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command
'''
if not self.link:
return
for cis_handle, acl_handle in zip(
command.cis_connection_handle, command.acl_connection_handle
):
if not (connection := self.find_connection_by_handle(acl_handle)):
logger.error(f'Cannot find connection with handle={acl_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
if not (cis_link := self.central_cis_links.get(cis_handle)):
logger.error(f'Cannot find CIS with handle={cis_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
cis_link.acl_connection = connection
self.link.create_cis(
self,
peripheral_address=connection.peer_address,
cig_id=cis_link.cig_id,
cis_id=cis_link.cis_id,
)
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
def on_hci_le_remove_cig_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command
'''
status = HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR
for cis_handle, cis_link in self.central_cis_links.items():
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(cis_handle)
status = HCI_SUCCESS
return struct.pack('<BH', status, command.cig_id)
def on_hci_le_accept_cis_request_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command
'''
if not self.link:
return
if not (
pending_cis_link := self.peripheral_cis_links.get(command.connection_handle)
):
logger.error(f'Cannot find CIS with handle={command.connection_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
assert pending_cis_link.acl_connection
self.link.accept_cis(
peripheral_controller=self,
central_address=pending_cis_link.acl_connection.peer_address,
cig_id=pending_cis_link.cig_id,
cis_id=pending_cis_link.cis_id,
)
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
def on_hci_le_setup_iso_data_path_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command

View File

@@ -97,12 +97,16 @@ class BaseError(Exception):
namespace = f'{self.error_namespace}/'
else:
namespace = ''
error_text = {
(True, True): f'{self.error_name} [0x{self.error_code:X}]',
(True, False): self.error_name,
(False, True): f'0x{self.error_code:X}',
(False, False): '',
}[(self.error_name != '', self.error_code is not None)]
have_name = self.error_name != ''
have_code = self.error_code is not None
if have_name and have_code:
error_text = f'{self.error_name} [0x{self.error_code:X}]'
elif have_name and not have_code:
error_text = self.error_name
elif not have_name and have_code:
error_text = f'0x{self.error_code:X}'
else:
error_text = '<unspecified>'
return f'{type(self).__name__}({namespace}{error_text})'
@@ -319,7 +323,7 @@ BT_HIDP_PROTOCOL_ID = UUID.from_16_bits(0x0011, 'HIDP')
BT_HARDCOPY_CONTROL_CHANNEL_PROTOCOL_ID = UUID.from_16_bits(0x0012, 'HardcopyControlChannel')
BT_HARDCOPY_DATA_CHANNEL_PROTOCOL_ID = UUID.from_16_bits(0x0014, 'HardcopyDataChannel')
BT_HARDCOPY_NOTIFICATION_PROTOCOL_ID = UUID.from_16_bits(0x0016, 'HardcopyNotification')
BT_AVTCP_PROTOCOL_ID = UUID.from_16_bits(0x0017, 'AVCTP')
BT_AVCTP_PROTOCOL_ID = UUID.from_16_bits(0x0017, 'AVCTP')
BT_AVDTP_PROTOCOL_ID = UUID.from_16_bits(0x0019, 'AVDTP')
BT_CMTP_PROTOCOL_ID = UUID.from_16_bits(0x001B, 'CMTP')
BT_MCAP_CONTROL_CHANNEL_PROTOCOL_ID = UUID.from_16_bits(0x001E, 'MCAPControlChannel')
@@ -821,8 +825,8 @@ class AdvertisingData:
ad_structures = []
self.ad_structures = ad_structures[:]
@staticmethod
def from_bytes(data):
@classmethod
def from_bytes(cls, data: bytes) -> AdvertisingData:
instance = AdvertisingData()
instance.append(data)
return instance
@@ -978,7 +982,7 @@ class AdvertisingData:
return ad_data
def append(self, data):
def append(self, data: bytes) -> None:
offset = 0
while offset + 1 < len(data):
length = data[offset]

File diff suppressed because it is too large Load Diff

View File

@@ -1068,7 +1068,7 @@ class Client:
logger.warning('!!! unexpected response, there is no pending request')
return
# Sanity check: the response should match the pending request unless it is
# The response should match the pending request unless it is
# an error response
if att_pdu.op_code != ATT_ERROR_RESPONSE:
expected_response_name = self.pending_request.name.replace(

View File

@@ -328,7 +328,7 @@ class Server(EventEmitter):
f'handle=0x{characteristic.handle:04X}: {value.hex()}'
)
# Sanity check
# Check parameters
if len(value) != 2:
logger.warning('CCCD value not 2 bytes long')
return

View File

@@ -23,7 +23,7 @@ import functools
import logging
import secrets
import struct
from typing import Any, Dict, Callable, Optional, Type, Union, List
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union
from bumble import crypto
from .colors import color
@@ -223,41 +223,47 @@ HCI_VENDOR_EVENT = 0xFF
# HCI Subevent Codes
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
HCI_LE_ADVERTISING_REPORT_EVENT = 0x02
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06
HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B
HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10
HCI_LE_SCAN_TIMEOUT_EVENT = 0x11
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15
HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16
HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18
HCI_LE_CIS_ESTABLISHED_EVENT = 0X19
HCI_LE_CIS_REQUEST_EVENT = 0X1A
HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D
HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F
HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
HCI_LE_ADVERTISING_REPORT_EVENT = 0x02
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06
HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B
HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10
HCI_LE_SCAN_TIMEOUT_EVENT = 0x11
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15
HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16
HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18
HCI_LE_CIS_ESTABLISHED_EVENT = 0X19
HCI_LE_CIS_REQUEST_EVENT = 0X1A
HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D
HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F
HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT = 0X24
HCI_LE_PERIODIC_ADVERTISING_REPORT_V2_EVENT = 0X25
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26
HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27
HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29
# HCI Command
@@ -650,47 +656,6 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
# Command Status codes
HCI_COMMAND_STATUS_PENDING = 0
# LE Event Masks
HCI_LE_CONNECTION_COMPLETE_EVENT_MASK = (1 << 0)
HCI_LE_ADVERTISING_REPORT_EVENT_MASK = (1 << 1)
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT_MASK = (1 << 2)
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT_MASK = (1 << 3)
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT_MASK = (1 << 4)
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT_MASK = (1 << 5)
HCI_LE_DATA_LENGTH_CHANGE_EVENT_MASK = (1 << 6)
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT_MASK = (1 << 7)
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT_MASK = (1 << 8)
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT_MASK = (1 << 9)
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT_MASK = (1 << 10)
HCI_LE_PHY_UPDATE_COMPLETE_EVENT_MASK = (1 << 11)
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT_MASK = (1 << 12)
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT_MASK = (1 << 13)
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT_MASK = (1 << 14)
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT_MASK = (1 << 15)
HCI_LE_EXTENDED_SCAN_TIMEOUT_EVENT_MASK = (1 << 16)
HCI_LE_EXTENDED_ADVERTISING_SET_TERMINATED_EVENT_MASK = (1 << 17)
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT_MASK = (1 << 18)
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT_MASK = (1 << 19)
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT_MASK = (1 << 20)
HCI_LE_CONNECTION_IQ_REPORT_EVENT_MASK = (1 << 21)
HCI_LE_CTE_REQUEST_FAILED_EVENT_MASK = (1 << 22)
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT_MASK = (1 << 23)
HCI_LE_CIS_ESTABLISHED_EVENT_MASK = (1 << 24)
HCI_LE_CIS_REQUEST_EVENT_MASK = (1 << 25)
HCI_LE_CREATE_BIG_COMPLETE_EVENT_MASK = (1 << 26)
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT_MASK = (1 << 27)
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT_MASK = (1 << 28)
HCI_LE_BIG_SYNC_LOST_EVENT_MASK = (1 << 29)
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT_MASK = (1 << 30)
HCI_LE_PATH_LOSS_THRESHOLD_EVENT_MASK = (1 << 31)
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT_MASK = (1 << 32)
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT_MASK = (1 << 33)
HCI_LE_SUBRATE_CHANGE_EVENT_MASK = (1 << 34)
HCI_LE_EVENT_MASK_NAMES = {
mask: mask_name for (mask_name, mask) in globals().items()
if mask_name.startswith('HCI_LE_') and mask_name.endswith('_EVENT_MASK')
}
# ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
@@ -732,15 +697,15 @@ HCI_LE_PHY_TYPE_TO_BIT = {
class Phy(enum.IntEnum):
LE_1M = 0x01
LE_2M = 0x02
LE_CODED = 0x03
LE_1M = HCI_LE_1M_PHY
LE_2M = HCI_LE_2M_PHY
LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag):
LE_1M = 0b00000001
LE_2M = 0b00000010
LE_CODED = 0b00000100
LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 1 << HCI_LE_2M_PHY_BIT
LE_CODED = 1 << HCI_LE_CODED_PHY_BIT
# Connection Parameters
@@ -1360,51 +1325,97 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
# LE Supported Features
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
class LeFeature(enum.IntEnum):
LE_ENCRYPTION = 0
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1
EXTENDED_REJECT_INDICATION = 2
PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 3
LE_PING = 4
LE_DATA_PACKET_LENGTH_EXTENSION = 5
LL_PRIVACY = 6
EXTENDED_SCANNER_FILTER_POLICIES = 7
LE_2M_PHY = 8
STABLE_MODULATION_INDEX_TRANSMITTER = 9
STABLE_MODULATION_INDEX_RECEIVER = 10
LE_CODED_PHY = 11
LE_EXTENDED_ADVERTISING = 12
LE_PERIODIC_ADVERTISING = 13
CHANNEL_SELECTION_ALGORITHM_2 = 14
LE_POWER_CLASS_1 = 15
MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 16
CONNECTION_CTE_REQUEST = 17
CONNECTION_CTE_RESPONSE = 18
CONNECTIONLESS_CTE_TRANSMITTER = 19
CONNECTIONLESS_CTR_RECEIVER = 20
ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 21
ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 22
RECEIVING_CONSTANT_TONE_EXTENSIONS = 23
PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 24
PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 25
SLEEP_CLOCK_ACCURACY_UPDATES = 26
REMOTE_PUBLIC_KEY_VALIDATION = 27
CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 28
CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 29
ISOCHRONOUS_BROADCASTER = 30
SYNCHRONIZED_RECEIVER = 31
CONNECTED_ISOCHRONOUS_STREAM = 32
LE_POWER_CONTROL_REQUEST = 33
LE_POWER_CONTROL_REQUEST_DUP = 34
LE_PATH_LOSS_MONITORING = 35
PERIODIC_ADVERTISING_ADI_SUPPORT = 36
CONNECTION_SUBRATING = 37
CONNECTION_SUBRATING_HOST_SUPPORT = 38
CHANNEL_CLASSIFICATION = 39
ADVERTISING_CODING_SELECTION = 40
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 41
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 43
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 44
class LeFeatureMask(enum.IntFlag):
LE_ENCRYPTION = 1 << 0
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1 << 1
EXTENDED_REJECT_INDICATION = 1 << 2
PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 1 << 3
LE_PING = 1 << 4
LE_DATA_PACKET_LENGTH_EXTENSION = 1 << 5
LL_PRIVACY = 1 << 6
EXTENDED_SCANNER_FILTER_POLICIES = 1 << 7
LE_2M_PHY = 1 << 8
STABLE_MODULATION_INDEX_TRANSMITTER = 1 << 9
STABLE_MODULATION_INDEX_RECEIVER = 1 << 10
LE_CODED_PHY = 1 << 11
LE_EXTENDED_ADVERTISING = 1 << 12
LE_PERIODIC_ADVERTISING = 1 << 13
CHANNEL_SELECTION_ALGORITHM_2 = 1 << 14
LE_POWER_CLASS_1 = 1 << 15
MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 1 << 16
CONNECTION_CTE_REQUEST = 1 << 17
CONNECTION_CTE_RESPONSE = 1 << 18
CONNECTIONLESS_CTE_TRANSMITTER = 1 << 19
CONNECTIONLESS_CTR_RECEIVER = 1 << 20
ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 1 << 21
ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 1 << 22
RECEIVING_CONSTANT_TONE_EXTENSIONS = 1 << 23
PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 1 << 24
PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 1 << 25
SLEEP_CLOCK_ACCURACY_UPDATES = 1 << 26
REMOTE_PUBLIC_KEY_VALIDATION = 1 << 27
CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 1 << 28
CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 1 << 29
ISOCHRONOUS_BROADCASTER = 1 << 30
SYNCHRONIZED_RECEIVER = 1 << 31
CONNECTED_ISOCHRONOUS_STREAM = 1 << 32
LE_POWER_CONTROL_REQUEST = 1 << 33
LE_POWER_CONTROL_REQUEST_DUP = 1 << 34
LE_PATH_LOSS_MONITORING = 1 << 35
PERIODIC_ADVERTISING_ADI_SUPPORT = 1 << 36
CONNECTION_SUBRATING = 1 << 37
CONNECTION_SUBRATING_HOST_SUPPORT = 1 << 38
CHANNEL_CLASSIFICATION = 1 << 39
ADVERTISING_CODING_SELECTION = 1 << 40
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << 41
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << 43
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << 44
LE_ENCRYPTION = 1 << LeFeature.LE_ENCRYPTION
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1 << LeFeature.CONNECTION_PARAMETERS_REQUEST_PROCEDURE
EXTENDED_REJECT_INDICATION = 1 << LeFeature.EXTENDED_REJECT_INDICATION
PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 1 << LeFeature.PERIPHERAL_INITIATED_FEATURE_EXCHANGE
LE_PING = 1 << LeFeature.LE_PING
LE_DATA_PACKET_LENGTH_EXTENSION = 1 << LeFeature.LE_DATA_PACKET_LENGTH_EXTENSION
LL_PRIVACY = 1 << LeFeature.LL_PRIVACY
EXTENDED_SCANNER_FILTER_POLICIES = 1 << LeFeature.EXTENDED_SCANNER_FILTER_POLICIES
LE_2M_PHY = 1 << LeFeature.LE_2M_PHY
STABLE_MODULATION_INDEX_TRANSMITTER = 1 << LeFeature.STABLE_MODULATION_INDEX_TRANSMITTER
STABLE_MODULATION_INDEX_RECEIVER = 1 << LeFeature.STABLE_MODULATION_INDEX_RECEIVER
LE_CODED_PHY = 1 << LeFeature.LE_CODED_PHY
LE_EXTENDED_ADVERTISING = 1 << LeFeature.LE_EXTENDED_ADVERTISING
LE_PERIODIC_ADVERTISING = 1 << LeFeature.LE_PERIODIC_ADVERTISING
CHANNEL_SELECTION_ALGORITHM_2 = 1 << LeFeature.CHANNEL_SELECTION_ALGORITHM_2
LE_POWER_CLASS_1 = 1 << LeFeature.LE_POWER_CLASS_1
MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 1 << LeFeature.MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE
CONNECTION_CTE_REQUEST = 1 << LeFeature.CONNECTION_CTE_REQUEST
CONNECTION_CTE_RESPONSE = 1 << LeFeature.CONNECTION_CTE_RESPONSE
CONNECTIONLESS_CTE_TRANSMITTER = 1 << LeFeature.CONNECTIONLESS_CTE_TRANSMITTER
CONNECTIONLESS_CTR_RECEIVER = 1 << LeFeature.CONNECTIONLESS_CTR_RECEIVER
ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 1 << LeFeature.ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION
ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 1 << LeFeature.ANTENNA_SWITCHING_DURING_CTE_RECEPTION
RECEIVING_CONSTANT_TONE_EXTENSIONS = 1 << LeFeature.RECEIVING_CONSTANT_TONE_EXTENSIONS
PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 1 << LeFeature.PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER
PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 1 << LeFeature.PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT
SLEEP_CLOCK_ACCURACY_UPDATES = 1 << LeFeature.SLEEP_CLOCK_ACCURACY_UPDATES
REMOTE_PUBLIC_KEY_VALIDATION = 1 << LeFeature.REMOTE_PUBLIC_KEY_VALIDATION
CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 1 << LeFeature.CONNECTED_ISOCHRONOUS_STREAM_CENTRAL
CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 1 << LeFeature.CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL
ISOCHRONOUS_BROADCASTER = 1 << LeFeature.ISOCHRONOUS_BROADCASTER
SYNCHRONIZED_RECEIVER = 1 << LeFeature.SYNCHRONIZED_RECEIVER
CONNECTED_ISOCHRONOUS_STREAM = 1 << LeFeature.CONNECTED_ISOCHRONOUS_STREAM
LE_POWER_CONTROL_REQUEST = 1 << LeFeature.LE_POWER_CONTROL_REQUEST
LE_POWER_CONTROL_REQUEST_DUP = 1 << LeFeature.LE_POWER_CONTROL_REQUEST_DUP
LE_PATH_LOSS_MONITORING = 1 << LeFeature.LE_PATH_LOSS_MONITORING
PERIODIC_ADVERTISING_ADI_SUPPORT = 1 << LeFeature.PERIODIC_ADVERTISING_ADI_SUPPORT
CONNECTION_SUBRATING = 1 << LeFeature.CONNECTION_SUBRATING
CONNECTION_SUBRATING_HOST_SUPPORT = 1 << LeFeature.CONNECTION_SUBRATING_HOST_SUPPORT
CHANNEL_CLASSIFICATION = 1 << LeFeature.CHANNEL_CLASSIFICATION
ADVERTISING_CODING_SELECTION = 1 << LeFeature.ADVERTISING_CODING_SELECTION
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << LeFeature.ADVERTISING_CODING_SELECTION_HOST_SUPPORT
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << LeFeature.PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << LeFeature.PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER
# fmt: on
@@ -2864,6 +2875,20 @@ class HCI_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.3.1 Set Event Mask Command
'''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# -----------------------------------------------------------------------------
@HCI_Command.command()
@@ -3387,6 +3412,20 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.8.1 LE Set Event Mask Command
'''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -3994,13 +4033,16 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
('advertising_sid', 1),
('scan_request_notification_enable', 1),
],
return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx__power', 1)],
return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx_power', 1)],
)
class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''
TX_POWER_NO_PREFERENCE = 0x7F
SHOULD_NOT_FRAGMENT = 0x01
class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
@@ -4245,7 +4287,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
('scanning_filter_policy:', self.scanning_filter_policy),
('scanning_phys: ', ','.join(scanning_phys_strs)),
]
for (i, scanning_phy_str) in enumerate(scanning_phys_strs):
for i, scanning_phy_str in enumerate(scanning_phys_strs):
fields.append(
(
f'{scanning_phy_str}.scan_type: ',
@@ -4388,7 +4430,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('peer_address: ', str(self.peer_address)),
('initiating_phys: ', ','.join(initiating_phys_strs)),
]
for (i, initiating_phys_str) in enumerate(initiating_phys_strs):
for i, initiating_phys_str in enumerate(initiating_phys_strs):
fields.append(
(
f'{initiating_phys_str}.scan_interval: ',
@@ -4859,7 +4901,8 @@ class HCI_Extended_Event(HCI_Event):
HCI_Object.init_from_bytes(self, parameters, 1, fields)
return self
def __init__(self, subevent_code, parameters, **kwargs):
def __init__(self, subevent_code=None, parameters=None, **kwargs):
assert subevent_code is not None
self.subevent_code = subevent_code
if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs:
parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes(
@@ -5274,7 +5317,7 @@ HCI_LE_Meta_Event.subevent_classes[
('status', 1),
('advertising_handle', 1),
('connection_handle', 2),
('number_completed_extended_advertising_events', 1),
('num_completed_extended_advertising_events', 1),
]
)
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
@@ -6215,7 +6258,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag:
if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
logger.warning(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4
@@ -6328,7 +6371,7 @@ class HCI_AclDataPacketAssembler:
self.current_data = None
self.l2cap_pdu_length = 0
else:
# Sanity check
# Compliance check
if len(self.current_data) > self.l2cap_pdu_length + 4:
logger.warning('!!! ACL data exceeds L2CAP PDU')
self.current_data = None

View File

@@ -21,7 +21,13 @@ from collections.abc import Callable, MutableMapping
from typing import cast, Any, Optional
import logging
from bumble import avc
from bumble import avctp
from bumble import avdtp
from bumble import avrcp
from bumble import crypto
from bumble import rfcomm
from bumble import sdp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
@@ -47,9 +53,7 @@ from bumble.hci import (
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
from bumble import crypto
# -----------------------------------------------------------------------------
# Logging
@@ -59,11 +63,14 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
rfcomm.RFCOMM_PSM: 'RFCOMM',
sdp.SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
avctp.AVCTP_PSM: 'AVCTP'
# TODO: add more PSM values
}
AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'}
# -----------------------------------------------------------------------------
class PacketTracer:
@@ -71,17 +78,20 @@ class PacketTracer:
psms: MutableMapping[int, int]
peer: Optional[PacketTracer.AclStream]
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
avctp_assemblers: MutableMapping[int, avctp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.avctp_assemblers = {} # AVCTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None
# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu: bytes) -> None:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
self.analyzer.emit(l2cap_pdu)
if l2cap_pdu.cid == ATT_CID:
att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload)
@@ -103,42 +113,51 @@ class PacketTracer:
connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer:
if psm := self.peer.psms.get(
connection_response.source_cid
):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)
if self.peer and (
psm := self.peer.psms.get(connection_response.source_cid)
):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(self.peer.on_avdtp_message)
elif psm == avctp.AVCTP_PSM:
self.avctp_assemblers[
connection_response.source_cid
] = avctp.MessageAssembler(self.on_avctp_message)
self.peer.avctp_assemblers[
connection_response.destination_cid
] = avctp.MessageAssembler(self.peer.on_avctp_message)
else:
# Try to find the PSM associated with this PDU
if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)):
if psm == SDP_PSM:
sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload)
if psm == sdp.SDP_PSM:
sdp_pdu = sdp.SDP_PDU.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(sdp_pdu)
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
elif psm == rfcomm.RFCOMM_PSM:
rfcomm_frame = rfcomm.RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
)
assembler = self.avdtp_assemblers.get(l2cap_pdu.cid)
if assembler:
assembler.on_pdu(l2cap_pdu.payload)
if avdtp_assembler := self.avdtp_assemblers.get(l2cap_pdu.cid):
avdtp_assembler.on_pdu(l2cap_pdu.payload)
elif psm == avctp.AVCTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVCTP]: {l2cap_pdu.payload.hex()}'
)
if avctp_assembler := self.avctp_assemblers.get(l2cap_pdu.cid):
avctp_assembler.on_pdu(l2cap_pdu.payload)
else:
psm_string = name_or_number(PSM_NAMES, psm)
self.analyzer.emit(
@@ -155,6 +174,28 @@ class PacketTracer:
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)
def on_avctp_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
):
if pid == avrcp.AVRCP_PID:
avc_frame = avc.Frame.from_bytes(payload)
details = str(avc_frame)
else:
details = payload.hex()
c_r = 'Command' if is_command else 'Response'
self.analyzer.emit(
f'{color("AVCTP", "green")} '
f'{c_r}[{transaction_label}][{name_or_number(AVCTP_PID_NAMES, pid)}] '
f'{"#" if ipid else ""}'
f'{details}'
)
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
self.packet_assembler.feed_packet(packet)

View File

@@ -18,6 +18,7 @@
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging
import struct
@@ -27,59 +28,15 @@ from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from bumble import drivers
from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_SUPPORTED_COMMANDS_FLAGS,
HCI_SYNCHRONOUS_DATA_PACKET,
HCI_VERSION_BLUETOOTH_CORE_4_0,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Local_Supported_Features_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command,
HCI_LE_Set_Event_Mask_Command,
HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command,
HCI_Packet,
HCI_Read_Buffer_Size_Command,
HCI_Read_Local_Supported_Commands_Command,
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket,
LeFeatureMask,
)
from .core import (
from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
from bumble.utils import AbortableEventEmitter
from bumble.transport.common import TransportLostError
if TYPE_CHECKING:
from .transport.common import TransportSink, TransportSource
@@ -99,15 +56,15 @@ class AclPacketQueue:
self,
max_packet_size: int,
max_in_flight: int,
send: Callable[[HCI_Packet], None],
send: Callable[[hci.HCI_Packet], None],
) -> None:
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
self.in_flight = 0
self.send = send
self.packets: Deque[HCI_AclDataPacket] = collections.deque()
self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
def enqueue(self, packet: HCI_AclDataPacket) -> None:
def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
self.packets.appendleft(packet)
self.check_queue()
@@ -139,11 +96,13 @@ class AclPacketQueue:
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, host: Host, handle: int, peer_address: Address, transport: int):
def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
):
self.host = host
self.handle = handle
self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
acl_packet_queue: Optional[AclPacketQueue] = (
host.le_acl_packet_queue
@@ -153,7 +112,7 @@ class Connection:
assert acl_packet_queue
self.acl_packet_queue = acl_packet_queue
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None:
def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
self.assembler.feed_packet(packet)
def on_acl_pdu(self, pdu: bytes) -> None:
@@ -161,9 +120,25 @@ class Connection:
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ScoLink:
peer_address: hci.Address
handle: int
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
peer_address: hci.Address
handle: int
# -----------------------------------------------------------------------------
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
cis_links: Dict[int, CisLink]
sco_links: Dict[int, ScoLink]
acl_packet_queue: Optional[AclPacketQueue] = None
le_acl_packet_queue: Optional[AclPacketQueue] = None
hci_sink: Optional[TransportSink] = None
@@ -171,7 +146,7 @@ class Host(AbortableEventEmitter):
long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
]
link_key_provider: Optional[Callable[[Address], Awaitable[Optional[bytes]]]]
link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]]
def __init__(
self,
@@ -183,8 +158,12 @@ class Host(AbortableEventEmitter):
self.hci_metadata = {}
self.ready = False # True when we can accept incoming packets
self.connections = {} # Connections, by connection handle
self.cis_links = {} # CIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
self.pending_response = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
@@ -204,7 +183,7 @@ class Host(AbortableEventEmitter):
def find_connection_by_bd_addr(
self,
bd_addr: Address,
bd_addr: hci.Address,
transport: Optional[int] = None,
check_address_type: bool = False,
) -> Optional[Connection]:
@@ -246,49 +225,139 @@ class Host(AbortableEventEmitter):
# Send a reset command unless a driver has already done so.
if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True)
await self.send_command(hci.HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True
hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
)
self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(
HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
)
self.local_le_features = struct.unpack(
'<Q', response.return_parameters.le_features
)[0]
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
)
self.local_version = response.return_parameters
await self.send_command(
HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('FFFFFFFFFFFFFF3F'))
hci.HCI_Set_Event_Mask_Command(
event_mask=hci.HCI_Set_Event_Mask_Command.mask(
[
hci.HCI_INQUIRY_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_EVENT,
hci.HCI_CONNECTION_COMPLETE_EVENT,
hci.HCI_CONNECTION_REQUEST_EVENT,
hci.HCI_DISCONNECTION_COMPLETE_EVENT,
hci.HCI_AUTHENTICATION_COMPLETE_EVENT,
hci.HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
hci.HCI_ENCRYPTION_CHANGE_EVENT,
hci.HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
hci.HCI_LINK_KEY_TYPE_CHANGED_EVENT,
hci.HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
hci.HCI_QOS_SETUP_COMPLETE_EVENT,
hci.HCI_HARDWARE_ERROR_EVENT,
hci.HCI_FLUSH_OCCURRED_EVENT,
hci.HCI_ROLE_CHANGE_EVENT,
hci.HCI_MODE_CHANGE_EVENT,
hci.HCI_RETURN_LINK_KEYS_EVENT,
hci.HCI_PIN_CODE_REQUEST_EVENT,
hci.HCI_LINK_KEY_REQUEST_EVENT,
hci.HCI_LINK_KEY_NOTIFICATION_EVENT,
hci.HCI_LOOPBACK_COMMAND_EVENT,
hci.HCI_DATA_BUFFER_OVERFLOW_EVENT,
hci.HCI_MAX_SLOTS_CHANGE_EVENT,
hci.HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
hci.HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
hci.HCI_QOS_VIOLATION_EVENT,
hci.HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
hci.HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
hci.HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
hci.HCI_SNIFF_SUBRATING_EVENT,
hci.HCI_EXTENDED_INQUIRY_RESULT_EVENT,
hci.HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
hci.HCI_IO_CAPABILITY_REQUEST_EVENT,
hci.HCI_IO_CAPABILITY_RESPONSE_EVENT,
hci.HCI_USER_CONFIRMATION_REQUEST_EVENT,
hci.HCI_USER_PASSKEY_REQUEST_EVENT,
hci.HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
hci.HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
hci.HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
hci.HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
hci.HCI_USER_PASSKEY_NOTIFICATION_EVENT,
hci.HCI_KEYPRESS_NOTIFICATION_EVENT,
hci.HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
hci.HCI_LE_META_EVENT,
]
)
)
)
if (
self.local_version is not None
and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0
and self.local_version.hci_version <= hci.HCI_VERSION_BLUETOOTH_CORE_4_0
):
# Some older controllers don't like event masks with bits they don't
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
le_event_mask = hci.HCI_LE_Set_Event_Mask_Command.mask(
[
hci.HCI_LE_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
hci.HCI_LE_DATA_LENGTH_CHANGE_EVENT,
hci.HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
hci.HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
hci.HCI_LE_SCAN_TIMEOUT_EVENT,
hci.HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
hci.HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
hci.HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
hci.HCI_LE_CONNECTION_IQ_REPORT_EVENT,
hci.HCI_LE_CTE_REQUEST_FAILED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
hci.HCI_LE_CIS_ESTABLISHED_EVENT,
hci.HCI_LE_CIS_REQUEST_EVENT,
hci.HCI_LE_CREATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_BIG_SYNC_LOST_EVENT,
hci.HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
hci.HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
]
)
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
)
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
HCI_Read_Buffer_Size_Command(), check_result=True
hci.HCI_Read_Buffer_Size_Command(), check_result=True
)
hc_acl_data_packet_length = (
response.return_parameters.hc_acl_data_packet_length
@@ -311,9 +380,9 @@ class Host(AbortableEventEmitter):
hc_le_acl_data_packet_length = 0
hc_total_num_le_acl_data_packets = 0
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
hc_le_acl_data_packet_length = (
response.return_parameters.hc_le_acl_data_packet_length
@@ -340,10 +409,12 @@ class Host(AbortableEventEmitter):
)
if self.supports_command(
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
):
response = await self.send_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
hci.HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
@@ -352,12 +423,34 @@ class Host(AbortableEventEmitter):
or suggested_max_tx_time != self.suggested_max_tx_time
):
await self.send_command(
HCI_LE_Write_Suggested_Default_Data_Length_Command(
hci.HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets=self.suggested_max_tx_octets,
suggested_max_tx_time=self.suggested_max_tx_time,
)
)
if self.supports_command(
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
check_result=True,
)
self.number_of_supported_advertising_sets = (
response.return_parameters.num_supported_advertising_sets
)
if self.supports_command(
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(),
check_result=True,
)
self.maximum_advertising_data_length = (
response.return_parameters.max_advertising_data_length
)
@property
def controller(self) -> Optional[TransportSink]:
return self.hci_sink
@@ -375,7 +468,7 @@ class Host(AbortableEventEmitter):
source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None:
def send_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}')
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
@@ -406,11 +499,12 @@ class Host(AbortableEventEmitter):
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed ({HCI_Constant.error_name(status)})'
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise HCI_Error(status)
raise hci.HCI_Error(status)
return response
except Exception as error:
@@ -423,8 +517,8 @@ class Host(AbortableEventEmitter):
self.pending_response = None
# Use this method to send a command from a task
def send_command_sync(self, command: HCI_Command) -> None:
async def send_command(command: HCI_Command) -> None:
def send_command_sync(self, command: hci.HCI_Command) -> None:
async def send_command(command: hci.HCI_Command) -> None:
await self.send_command(command)
asyncio.create_task(send_command(command))
@@ -449,7 +543,7 @@ class Host(AbortableEventEmitter):
pb_flag = 0
while bytes_remaining:
data_total_length = min(bytes_remaining, packet_queue.max_packet_size)
acl_packet = HCI_AclDataPacket(
acl_packet = hci.HCI_AclDataPacket(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=0,
@@ -464,7 +558,7 @@ class Host(AbortableEventEmitter):
def supports_command(self, command):
# Find the support flag position for this command
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for octet, flags in enumerate(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags):
if value == command:
# Check if the flag is set
@@ -479,16 +573,16 @@ class Host(AbortableEventEmitter):
def supported_commands(self):
commands = []
for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
if octet < len(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
command = hci.HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
return commands
def supports_le_features(self, feature: LeFeatureMask) -> bool:
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
@property
@@ -499,10 +593,10 @@ class Host(AbortableEventEmitter):
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet: bytes) -> None:
hci_packet = HCI_Packet.from_bytes(packet)
hci_packet = hci.HCI_Packet.from_bytes(packet)
if self.ready or (
isinstance(hci_packet, HCI_Command_Complete_Event)
and hci_packet.command_opcode == HCI_RESET_COMMAND
isinstance(hci_packet, hci.HCI_Command_Complete_Event)
and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
):
self.on_hci_packet(hci_packet)
else:
@@ -515,44 +609,44 @@ class Host(AbortableEventEmitter):
self.emit('flush')
def on_hci_packet(self, packet: HCI_Packet) -> None:
def on_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
# If the packet is a command, invoke the handler for this packet
if packet.hci_packet_type == HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(HCI_Command, packet))
elif packet.hci_packet_type == HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(HCI_Event, packet))
elif packet.hci_packet_type == HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
if packet.hci_packet_type == hci.HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(hci.HCI_Command, packet))
elif packet.hci_packet_type == hci.HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(hci.HCI_Event, packet))
elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet))
elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet))
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
def on_hci_command_packet(self, command: HCI_Command) -> None:
def on_hci_command_packet(self, command: hci.HCI_Command) -> None:
logger.warning(f'!!! unexpected command packet: {command}')
def on_hci_event_packet(self, event: HCI_Event) -> None:
def on_hci_event_packet(self, event: hci.HCI_Event) -> None:
handler_name = f'on_{event.name.lower()}'
handler = getattr(self, handler_name, self.on_hci_event)
handler(event)
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None:
def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
# Look for the connection to which this data belongs
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None:
def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
@@ -616,11 +710,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_connection_complete_event(self, event):
# Check if this is a cancellation
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}'
f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}'
)
connection = self.connections.get(event.connection_handle)
@@ -660,7 +754,7 @@ class Host(AbortableEventEmitter):
self.on_hci_le_connection_complete_event(event)
def on_hci_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] '
@@ -696,25 +790,38 @@ class Host(AbortableEventEmitter):
def on_hci_disconnection_complete_event(self, event):
# Find the connection
if (connection := self.connections.get(event.connection_handle)) is None:
handle = event.connection_handle
if (
connection := (
self.connections.get(handle)
or self.cis_links.get(handle)
or self.sco_links.get(handle)
)
) is None:
logger.warning('!!! DISCONNECTION COMPLETE: unknown handle')
return
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{event.connection_handle:04X}] '
f'### DISCONNECTION: [0x{handle:04X}] '
f'{connection.peer_address} '
f'reason={event.reason}'
)
del self.connections[event.connection_handle]
# Notify the listeners
self.emit('disconnection', event.connection_handle, event.reason)
self.emit('disconnection', handle, event.reason)
# Remove the handle reference
_ = (
self.connections.pop(handle, 0)
or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0)
)
else:
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('disconnection_failure', event.connection_handle, event.status)
self.emit('disconnection_failure', handle, event.status)
def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
@@ -722,7 +829,7 @@ class Host(AbortableEventEmitter):
return
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
connection_parameters = ConnectionParameters(
event.connection_interval,
event.peripheral_latency,
@@ -742,7 +849,7 @@ class Host(AbortableEventEmitter):
return
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
else:
@@ -761,6 +868,7 @@ class Host(AbortableEventEmitter):
event.status,
event.advertising_handle,
event.connection_handle,
event.num_completed_extended_advertising_events,
)
def on_hci_le_cis_request_event(self, event):
@@ -774,7 +882,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.cis_links[event.connection_handle] = CisLink(
handle=event.connection_handle,
peer_address=hci.Address.ANY,
)
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
@@ -789,7 +901,7 @@ class Host(AbortableEventEmitter):
# For now, just accept everything
# TODO: delegate the decision
self.send_command_sync(
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
connection_handle=event.connection_handle,
interval_min=event.interval_min,
interval_max=event.interval_max,
@@ -820,12 +932,12 @@ class Host(AbortableEventEmitter):
),
)
if long_term_key:
response = HCI_LE_Long_Term_Key_Request_Reply_Command(
response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command(
connection_handle=event.connection_handle,
long_term_key=long_term_key,
)
else:
response = HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
connection_handle=event.connection_handle
)
@@ -834,13 +946,18 @@ class Host(AbortableEventEmitter):
asyncio.create_task(send_long_term_key())
def on_hci_synchronous_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### SCO CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr}'
)
self.sco_links[event.connection_handle] = ScoLink(
peer_address=event.bd_addr,
handle=event.connection_handle,
)
# Notify the client
self.emit(
'sco_connection',
@@ -858,16 +975,16 @@ class Host(AbortableEventEmitter):
pass
def on_hci_role_change_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}'
f'{hci.HCI_Constant.role_name(event.new_role)}'
)
self.emit('role_change', event.bd_addr, event.new_role)
else:
logger.debug(
f'role change for {event.bd_addr} failed: '
f'{HCI_Constant.error_name(event.status)}'
f'{hci.HCI_Constant.error_name(event.status)}'
)
self.emit('role_change_failure', event.bd_addr, event.status)
@@ -883,7 +1000,7 @@ class Host(AbortableEventEmitter):
def on_hci_authentication_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('connection_authentication', event.connection_handle)
else:
self.emit(
@@ -894,7 +1011,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_change_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit(
'connection_encryption_change',
event.connection_handle,
@@ -907,7 +1024,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_key_refresh_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('connection_encryption_key_refresh', event.connection_handle)
else:
self.emit(
@@ -928,16 +1045,16 @@ class Host(AbortableEventEmitter):
def on_hci_link_key_notification_event(self, event):
logger.debug(
f'link key for {event.bd_addr}: {event.link_key.hex()}, '
f'type={HCI_Constant.link_key_type_name(event.key_type)}'
f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}'
)
self.emit('link_key', event.bd_addr, event.link_key, event.key_type)
def on_hci_simple_pairing_complete_event(self, event):
logger.debug(
f'simple pairing complete for {event.bd_addr}: '
f'status={HCI_Constant.status_name(event.status)}'
f'status={hci.HCI_Constant.status_name(event.status)}'
)
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('classic_pairing', event.bd_addr)
else:
self.emit('classic_pairing_failure', event.bd_addr, event.status)
@@ -957,11 +1074,11 @@ class Host(AbortableEventEmitter):
self.link_key_provider(event.bd_addr),
)
if link_key:
response = HCI_Link_Key_Request_Reply_Command(
response = hci.HCI_Link_Key_Request_Reply_Command(
bd_addr=event.bd_addr, link_key=link_key
)
else:
response = HCI_Link_Key_Request_Negative_Reply_Command(
response = hci.HCI_Link_Key_Request_Negative_Reply_Command(
bd_addr=event.bd_addr
)
@@ -1018,7 +1135,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS:
if event.status != hci.HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
utf8_name = event.remote_name
@@ -1036,7 +1153,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_le_read_remote_features_complete_event(self, event):
if event.status != HCI_SUCCESS:
if event.status != hci.HCI_SUCCESS:
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)

View File

@@ -208,7 +208,7 @@ class L2CAP_PDU:
@staticmethod
def from_bytes(data: bytes) -> L2CAP_PDU:
# Sanity check
# Check parameters
if len(data) < 4:
raise ValueError('not enough data for L2CAP header')

View File

@@ -196,6 +196,60 @@ class LocalLink:
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_encrypted(central_address, rand, ediv, ltk)
def create_cis(
self,
central_controller: controller.Controller,
peripheral_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Request {central_controller.random_address} -> {peripheral_address}'
)
if peripheral_controller := self.find_controller(peripheral_address):
asyncio.get_running_loop().call_soon(
peripheral_controller.on_link_cis_request,
central_controller.random_address,
cig_id,
cis_id,
)
def accept_cis(
self,
peripheral_controller: controller.Controller,
central_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Accept {peripheral_controller.random_address} -> {central_address}'
)
if central_controller := self.find_controller(central_address):
asyncio.get_running_loop().call_soon(
central_controller.on_link_cis_established, cig_id, cis_id
)
asyncio.get_running_loop().call_soon(
peripheral_controller.on_link_cis_established, cig_id, cis_id
)
def disconnect_cis(
self,
initiator_controller: controller.Controller,
peer_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Disconnect {initiator_controller.random_address} -> {peer_address}'
)
if peer_controller := self.find_controller(peer_address):
asyncio.get_running_loop().call_soon(
initiator_controller.on_link_cis_disconnected, cig_id, cis_id
)
asyncio.get_running_loop().call_soon(
peer_controller.on_link_cis_disconnected, cig_id, cis_id
)
############################################################
# Classic handlers
############################################################

View File

@@ -110,7 +110,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(just_works=empty_pb2.Empty()))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
assert answer.answer_variant() == 'confirm' and answer.confirm is not None
return answer.confirm
@@ -125,7 +125,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(numeric_comparison=number))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
assert answer.answer_variant() == 'confirm' and answer.confirm is not None
return answer.confirm
@@ -140,7 +140,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(passkey_entry_request=empty_pb2.Empty()))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
if answer.answer_variant() is None:
return None
@@ -157,7 +157,7 @@ class PairingDelegate(BasePairingDelegate):
event = self.add_origin(PairingEvent(pin_code_request=empty_pb2.Empty()))
self.service.event_queue.put_nowait(event)
answer = await anext(self.service.event_answer) # pytype: disable=name-error
answer = await anext(self.service.event_answer) # type: ignore
assert answer.event == event
if answer.answer_variant() is None:
return None

View File

@@ -97,7 +97,8 @@ SDP_CLIENT_EXECUTABLE_URL_ATTRIBUTE_ID = 0X000B
SDP_ICON_URL_ATTRIBUTE_ID = 0X000C
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0X000D
# Attribute Identifier (cf. Assigned Numbers for Service Discovery)
# Profile-specific Attribute Identifiers (cf. Assigned Numbers for Service Discovery)
# used by AVRCP, HFP and A2DP
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID = 0x0311
@@ -115,7 +116,8 @@ SDP_ATTRIBUTE_ID_NAMES = {
SDP_DOCUMENTATION_URL_ATTRIBUTE_ID: 'SDP_DOCUMENTATION_URL_ATTRIBUTE_ID',
SDP_CLIENT_EXECUTABLE_URL_ATTRIBUTE_ID: 'SDP_CLIENT_EXECUTABLE_URL_ATTRIBUTE_ID',
SDP_ICON_URL_ATTRIBUTE_ID: 'SDP_ICON_URL_ATTRIBUTE_ID',
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 'SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID'
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 'SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID',
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: 'SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID',
}
SDP_PUBLIC_BROWSE_ROOT = core.UUID.from_16_bits(0x1002, 'PublicBrowseRoot')

View File

@@ -1134,8 +1134,10 @@ class Session:
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
self.link_key = await self.manager.device.get_link_key(
self.connection.peer_address
)
if self.link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
@@ -1143,7 +1145,7 @@ class Session:
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
self.ltk = self.derive_ltk(self.link_key, self.ct2)
def distribute_keys(self) -> None:
# Distribute the keys as required
@@ -1991,10 +1993,8 @@ class Manager(EventEmitter):
) -> None:
# Store the keys in the key store
if self.device.keystore and identity_address is not None:
self.device.abort_on(
'flush', self.device.update_keys(str(identity_address), keys)
)
# Make sure on_pairing emits after key update.
await self.device.update_keys(str(identity_address), keys)
# Notify the device
self.device.on_pairing(session.connection, identity_address, keys, session.sc)

View File

@@ -17,9 +17,10 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import traceback
import collections
import enum
import functools
import logging
import sys
import warnings
from typing import (
@@ -34,7 +35,7 @@ from typing import (
Union,
overload,
)
from functools import wraps, partial
from pyee import EventEmitter
from .colors import color
@@ -131,13 +132,14 @@ class EventWatcher:
Args:
emitter: EventEmitter to watch
event: Event name
handler: (Optional) Event handler. When nothing is passed, this method works as a decorator.
handler: (Optional) Event handler. When nothing is passed, this method
works as a decorator.
'''
def wrapper(f: _Handler) -> _Handler:
self.handlers.append((emitter, event, f))
emitter.on(event, f)
return f
def wrapper(wrapped: _Handler) -> _Handler:
self.handlers.append((emitter, event, wrapped))
emitter.on(event, wrapped)
return wrapped
return wrapper if handler is None else wrapper(handler)
@@ -157,13 +159,14 @@ class EventWatcher:
Args:
emitter: EventEmitter to watch
event: Event name
handler: (Optional) Event handler. When nothing passed, this method works as a decorator.
handler: (Optional) Event handler. When nothing passed, this method works
as a decorator.
'''
def wrapper(f: _Handler) -> _Handler:
self.handlers.append((emitter, event, f))
emitter.once(event, f)
return f
def wrapper(wrapped: _Handler) -> _Handler:
self.handlers.append((emitter, event, wrapped))
emitter.once(event, wrapped)
return wrapped
return wrapper if handler is None else wrapper(handler)
@@ -223,13 +226,13 @@ class CompositeEventEmitter(AbortableEventEmitter):
if self._listener:
# Call the deregistration methods for each base class that has them
for cls in self._listener.__class__.mro():
if hasattr(cls, '_bumble_register_composite'):
cls._bumble_deregister_composite(listener, self)
if '_bumble_register_composite' in cls.__dict__:
cls._bumble_deregister_composite(self._listener, self)
self._listener = listener
if listener:
# Call the registration methods for each base class that has them
for cls in listener.__class__.mro():
if hasattr(cls, '_bumble_deregister_composite'):
if '_bumble_deregister_composite' in cls.__dict__:
cls._bumble_register_composite(listener, self)
@@ -276,7 +279,7 @@ class AsyncRunner:
"""
def decorator(func):
@wraps(func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
coroutine = func(*args, **kwargs)
if queue is None:
@@ -410,30 +413,35 @@ class FlowControlAsyncPipe:
self.check_pump()
# -----------------------------------------------------------------------------
async def async_call(function, *args, **kwargs):
"""
Immediately calls the function with provided args and kwargs, wrapping it in an async function.
Rust's `pyo3_asyncio` library needs functions to be marked async to properly inject a running loop.
Immediately calls the function with provided args and kwargs, wrapping it in an
async function.
Rust's `pyo3_asyncio` library needs functions to be marked async to properly inject
a running loop.
result = await async_call(some_function, ...)
"""
return function(*args, **kwargs)
# -----------------------------------------------------------------------------
def wrap_async(function):
"""
Wraps the provided function in an async function.
"""
return partial(async_call, function)
return functools.partial(async_call, function)
# -----------------------------------------------------------------------------
def deprecated(msg: str):
"""
Throw deprecation warning before execution.
"""
def wrapper(function):
@wraps(function)
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, DeprecationWarning)
return function(*args, **kwargs)
@@ -443,13 +451,14 @@ def deprecated(msg: str):
return wrapper
# -----------------------------------------------------------------------------
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
@@ -457,3 +466,22 @@ def experimental(msg: str):
return inner
return wrapper
# -----------------------------------------------------------------------------
class OpenIntEnum(enum.IntEnum):
"""
Subclass of enum.IntEnum that can hold integer values outside the set of
predefined values. This is convenient for implementing protocols where some
integer constants may be added over time.
"""
@classmethod
def _missing_(cls, value):
if not isinstance(value, int):
return None
obj = int.__new__(cls, value)
obj._value_ = value
obj._name_ = f"{cls.__name__}[{value}]"
return obj

274
examples/avrcp_as_sink.html Normal file
View File

@@ -0,0 +1,274 @@
<html>
<head>
<style>
* {
font-family: sans-serif;
}
</style>
</head>
<body>
Server Port <input id="port" type="text" value="8989"></input> <button id="connectButton" onclick="connect()">Connect</button><br>
<div id="socketState"></div>
<br>
<div id="buttons"></div><br>
<hr>
<button onclick="onGetPlayStatusButtonClicked()">Get Play Status</button><br>
<div id="getPlayStatusResponseTable"></div>
<hr>
<button onclick="onGetElementAttributesButtonClicked()">Get Element Attributes</button><br>
<div id="getElementAttributesResponseTable"></div>
<hr>
<table>
<tr>
<b>VOLUME</b>:
<button onclick="onVolumeDownButtonClicked()">-</button>
<button onclick="onVolumeUpButtonClicked()">+</button>&nbsp;
<span id="volumeText"></span><br>
</tr>
<tr>
<td><b>PLAYBACK STATUS</b></td><td><span id="playbackStatusText"></span></td>
</tr>
<tr>
<td><b>POSITION</b></td><td><span id="positionText"></span></td>
</tr>
<tr>
<td><b>TRACK</b></td><td><span id="trackText"></span></td>
</tr>
<tr>
<td><b>ADDRESSED PLAYER</b></td><td><span id="addressedPlayerText"></span></td>
</tr>
<tr>
<td><b>UID COUNTER</b></td><td><span id="uidCounterText"></span></td>
</tr>
<tr>
<td><b>SUPPORTED EVENTS</b></td><td><span id="supportedEventsText"></span></td>
</tr>
<tr>
<td><b>PLAYER SETTINGS</b></td><td><div id="playerSettingsTable"></div></td>
</tr>
</table>
<script>
const portInput = document.getElementById("port")
const connectButton = document.getElementById("connectButton")
const socketState = document.getElementById("socketState")
const volumeText = document.getElementById("volumeText")
const positionText = document.getElementById("positionText")
const trackText = document.getElementById("trackText")
const playbackStatusText = document.getElementById("playbackStatusText")
const addressedPlayerText = document.getElementById("addressedPlayerText")
const uidCounterText = document.getElementById("uidCounterText")
const supportedEventsText = document.getElementById("supportedEventsText")
const playerSettingsTable = document.getElementById("playerSettingsTable")
const getPlayStatusResponseTable = document.getElementById("getPlayStatusResponseTable")
const getElementAttributesResponseTable = document.getElementById("getElementAttributesResponseTable")
let socket
let volume = 0
const keyNames = [
"SELECT",
"UP",
"DOWN",
"LEFT",
"RIGHT",
"RIGHT_UP",
"RIGHT_DOWN",
"LEFT_UP",
"LEFT_DOWN",
"ROOT_MENU",
"SETUP_MENU",
"CONTENTS_MENU",
"FAVORITE_MENU",
"EXIT",
"NUMBER_0",
"NUMBER_1",
"NUMBER_2",
"NUMBER_3",
"NUMBER_4",
"NUMBER_5",
"NUMBER_6",
"NUMBER_7",
"NUMBER_8",
"NUMBER_9",
"DOT",
"ENTER",
"CLEAR",
"CHANNEL_UP",
"CHANNEL_DOWN",
"PREVIOUS_CHANNEL",
"SOUND_SELECT",
"INPUT_SELECT",
"DISPLAY_INFORMATION",
"HELP",
"PAGE_UP",
"PAGE_DOWN",
"POWER",
"VOLUME_UP",
"VOLUME_DOWN",
"MUTE",
"PLAY",
"STOP",
"PAUSE",
"RECORD",
"REWIND",
"FAST_FORWARD",
"EJECT",
"FORWARD",
"BACKWARD",
"ANGLE",
"SUBPICTURE",
"F1",
"F2",
"F3",
"F4",
"F5",
]
document.addEventListener('keydown', onKeyDown)
document.addEventListener('keyup', onKeyUp)
const buttons = document.getElementById("buttons")
keyNames.forEach(name => {
const button = document.createElement("BUTTON")
button.appendChild(document.createTextNode(name))
button.addEventListener("mousedown", event => {
send({type: 'send-key-down', key: name})
})
button.addEventListener("mouseup", event => {
send({type: 'send-key-up', key: name})
})
buttons.appendChild(button)
})
updateVolume(0)
function connect() {
socket = new WebSocket(`ws://localhost:${portInput.value}`);
socket.onopen = _ => {
socketState.innerText = 'OPEN'
connectButton.disabled = true
}
socket.onclose = _ => {
socketState.innerText = 'CLOSED'
connectButton.disabled = false
}
socket.onerror = (error) => {
socketState.innerText = 'ERROR'
console.log(`ERROR: ${error}`)
connectButton.disabled = false
}
socket.onmessage = (message) => {
onMessage(JSON.parse(message.data))
}
}
function send(message) {
if (socket && socket.readyState == WebSocket.OPEN) {
socket.send(JSON.stringify(message))
}
}
function hmsText(position) {
const h_1 = 1000 * 60 * 60
const h = Math.floor(position / h_1)
position -= h * h_1
const m_1 = 1000 * 60
const m = Math.floor(position / m_1)
position -= m * m_1
const s_1 = 1000
const s = Math.floor(position / s_1)
position -= s * s_1
return `${h}:${m.toString().padStart(2, "0")}:${s.toString().padStart(2, "0")}:${position}`
}
function setTableHead(table, columns) {
let thead = table.createTHead()
let row = thead.insertRow()
for (let column of columns) {
let th = document.createElement("th")
let text = document.createTextNode(column)
th.appendChild(text)
row.appendChild(th)
}
}
function createTable(rows) {
const table = document.createElement("table")
if (rows.length != 0) {
columns = Object.keys(rows[0])
setTableHead(table, columns)
}
for (let element of rows) {
let row = table.insertRow()
for (key in element) {
let cell = row.insertCell()
let text = document.createTextNode(element[key])
cell.appendChild(text)
}
}
return table
}
function onMessage(message) {
console.log(message)
if (message.type == "set-volume") {
updateVolume(message.params.volume)
} else if (message.type == "supported-events") {
supportedEventsText.innerText = JSON.stringify(message.params.events)
} else if (message.type == "playback-position-changed") {
positionText.innerText = hmsText(message.params.position)
} else if (message.type == "playback-status-changed") {
playbackStatusText.innerText = message.params.status
} else if (message.type == "player-settings-changed") {
playerSettingsTable.replaceChildren(message.params.settings)
} else if (message.type == "track-changed") {
trackText.innerText = message.params.identifier
} else if (message.type == "addressed-player-changed") {
addressedPlayerText.innerText = JSON.stringify(message.params.player)
} else if (message.type == "uids-changed") {
uidCounterText.innerText = message.params.uid_counter
} else if (message.type == "get-play-status-response") {
getPlayStatusResponseTable.replaceChildren(message.params)
} else if (message.type == "get-element-attributes-response") {
getElementAttributesResponseTable.replaceChildren(createTable(message.params))
}
}
function updateVolume(newVolume) {
volume = newVolume
volumeText.innerText = `${volume} (${Math.round(100*volume/0x7F)}%)`
}
function onKeyDown(event) {
console.log(event)
send({ type: 'send-key-down', key: event.key })
}
function onKeyUp(event) {
console.log(event)
send({ type: 'send-key-up', key: event.key })
}
function onVolumeUpButtonClicked() {
updateVolume(Math.min(volume + 5, 0x7F))
send({ type: 'set-volume', volume })
}
function onVolumeDownButtonClicked() {
updateVolume(Math.max(volume - 5, 0))
send({ type: 'set-volume', volume })
}
function onGetPlayStatusButtonClicked() {
send({ type: 'get-play-status', volume })
}
function onGetElementAttributesButtonClicked() {
send({ type: 'get-element-attributes' })
}
</script>
</body>
</html>

View File

@@ -19,9 +19,11 @@ import asyncio
import logging
import sys
import os
import struct
from bumble.core import AdvertisingData
from bumble.device import AdvertisingType, Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
@@ -52,6 +54,16 @@ async def main():
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
AdvertisingData(
[
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340)),
]
)
)
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()

408
examples/run_avrcp.py Normal file
View File

@@ -0,0 +1,408 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
import sys
import os
import logging
import websockets
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble import avc
from bumble import avrcp
from bumble import avdtp
from bumble import a2dp
from bumble import utils
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def sdp_records():
a2dp_sink_service_record_handle = 0x00010001
avrcp_controller_service_record_handle = 0x00010002
avrcp_target_service_record_handle = 0x00010003
# pylint: disable=line-too-long
return {
a2dp_sink_service_record_handle: a2dp.make_audio_sink_service_sdp_records(
a2dp_sink_service_record_handle
),
avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records(
avrcp_controller_service_record_handle
),
avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records(
avrcp_controller_service_record_handle
),
}
# -----------------------------------------------------------------------------
def codec_capabilities():
return avdtp.MediaCodecCapabilities(
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=a2dp.A2DP_SBC_CODEC_TYPE,
media_codec_information=a2dp.SbcMediaCodecInformation.from_lists(
sampling_frequencies=[48000, 44100, 32000, 16000],
channel_modes=[
a2dp.SBC_MONO_CHANNEL_MODE,
a2dp.SBC_DUAL_CHANNEL_MODE,
a2dp.SBC_STEREO_CHANNEL_MODE,
a2dp.SBC_JOINT_STEREO_CHANNEL_MODE,
],
block_lengths=[4, 8, 12, 16],
subbands=[4, 8],
allocation_methods=[
a2dp.SBC_LOUDNESS_ALLOCATION_METHOD,
a2dp.SBC_SNR_ALLOCATION_METHOD,
],
minimum_bitpool_value=2,
maximum_bitpool_value=53,
),
)
# -----------------------------------------------------------------------------
def on_avdtp_connection(server):
# Add a sink endpoint to the server
sink = server.add_sink(codec_capabilities())
sink.on('rtp_packet', on_rtp_packet)
# -----------------------------------------------------------------------------
def on_rtp_packet(packet):
print(f'RTP: {packet}')
# -----------------------------------------------------------------------------
def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer):
async def get_supported_events():
events = await avrcp_protocol.get_supported_events()
print("SUPPORTED EVENTS:", events)
websocket_server.send_message(
{
"type": "supported-events",
"params": {"events": [event.name for event in events]},
}
)
if avrcp.EventId.TRACK_CHANGED in events:
utils.AsyncRunner.spawn(monitor_track_changed())
if avrcp.EventId.PLAYBACK_STATUS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_playback_status())
if avrcp.EventId.PLAYBACK_POS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_playback_position())
if avrcp.EventId.PLAYER_APPLICATION_SETTING_CHANGED in events:
utils.AsyncRunner.spawn(monitor_player_application_settings())
if avrcp.EventId.AVAILABLE_PLAYERS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_available_players())
if avrcp.EventId.ADDRESSED_PLAYER_CHANGED in events:
utils.AsyncRunner.spawn(monitor_addressed_player())
if avrcp.EventId.UIDS_CHANGED in events:
utils.AsyncRunner.spawn(monitor_uids())
if avrcp.EventId.VOLUME_CHANGED in events:
utils.AsyncRunner.spawn(monitor_volume())
utils.AsyncRunner.spawn(get_supported_events())
async def monitor_track_changed():
async for identifier in avrcp_protocol.monitor_track_changed():
print("TRACK CHANGED:", identifier.hex())
websocket_server.send_message(
{"type": "track-changed", "params": {"identifier": identifier.hex()}}
)
async def monitor_playback_status():
async for playback_status in avrcp_protocol.monitor_playback_status():
print("PLAYBACK STATUS CHANGED:", playback_status.name)
websocket_server.send_message(
{
"type": "playback-status-changed",
"params": {"status": playback_status.name},
}
)
async def monitor_playback_position():
async for playback_position in avrcp_protocol.monitor_playback_position(
playback_interval=1
):
print("PLAYBACK POSITION CHANGED:", playback_position)
websocket_server.send_message(
{
"type": "playback-position-changed",
"params": {"position": playback_position},
}
)
async def monitor_player_application_settings():
async for settings in avrcp_protocol.monitor_player_application_settings():
print("PLAYER APPLICATION SETTINGS:", settings)
settings_as_dict = [
{"attribute": setting.attribute_id.name, "value": setting.value_id.name}
for setting in settings
]
websocket_server.send_message(
{
"type": "player-settings-changed",
"params": {"settings": settings_as_dict},
}
)
async def monitor_available_players():
async for _ in avrcp_protocol.monitor_available_players():
print("AVAILABLE PLAYERS CHANGED")
websocket_server.send_message(
{"type": "available-players-changed", "params": {}}
)
async def monitor_addressed_player():
async for player in avrcp_protocol.monitor_addressed_player():
print("ADDRESSED PLAYER CHANGED")
websocket_server.send_message(
{
"type": "addressed-player-changed",
"params": {
"player": {
"player_id": player.player_id,
"uid_counter": player.uid_counter,
}
},
}
)
async def monitor_uids():
async for uid_counter in avrcp_protocol.monitor_uids():
print("UIDS CHANGED")
websocket_server.send_message(
{
"type": "uids-changed",
"params": {
"uid_counter": uid_counter,
},
}
)
async def monitor_volume():
async for volume in avrcp_protocol.monitor_volume():
print("VOLUME CHANGED:", volume)
websocket_server.send_message(
{"type": "volume-changed", "params": {"volume": volume}}
)
# -----------------------------------------------------------------------------
class WebSocketServer:
def __init__(
self, avrcp_protocol: avrcp.Protocol, avrcp_delegate: Delegate
) -> None:
self.socket = None
self.delegate = None
self.avrcp_protocol = avrcp_protocol
self.avrcp_delegate = avrcp_delegate
async def start(self) -> None:
# pylint: disable-next=no-member
await websockets.serve(self.serve, 'localhost', 8989) # type: ignore
async def serve(self, socket, _path) -> None:
print('### WebSocket connected')
self.socket = socket
while True:
try:
message = await socket.recv()
print('Received: ', str(message))
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'send-key-down':
await self.on_send_key_down(parsed)
elif message_type == 'send-key-up':
await self.on_send_key_up(parsed)
elif message_type == 'set-volume':
await self.on_set_volume(parsed)
elif message_type == 'get-play-status':
await self.on_get_play_status()
elif message_type == 'get-element-attributes':
await self.on_get_element_attributes()
except websockets.exceptions.ConnectionClosedOK:
self.socket = None
break
async def on_send_key_down(self, message: dict) -> None:
key = avc.PassThroughFrame.OperationId[message["key"]]
await self.avrcp_protocol.send_key_event(key, True)
async def on_send_key_up(self, message: dict) -> None:
key = avc.PassThroughFrame.OperationId[message["key"]]
await self.avrcp_protocol.send_key_event(key, False)
async def on_set_volume(self, message: dict) -> None:
volume = message["volume"]
self.avrcp_delegate.volume = volume
self.avrcp_protocol.notify_volume_changed(volume)
async def on_get_play_status(self) -> None:
play_status = await self.avrcp_protocol.get_play_status()
self.send_message(
{
"type": "get-play-status-response",
"params": {
"song_length": play_status.song_length,
"song_position": play_status.song_position,
"play_status": play_status.play_status.name,
},
}
)
async def on_get_element_attributes(self) -> None:
attributes = await self.avrcp_protocol.get_element_attributes(
0,
[
avrcp.MediaAttributeId.TITLE,
avrcp.MediaAttributeId.ARTIST_NAME,
avrcp.MediaAttributeId.ALBUM_NAME,
avrcp.MediaAttributeId.TRACK_NUMBER,
avrcp.MediaAttributeId.TOTAL_NUMBER_OF_TRACKS,
avrcp.MediaAttributeId.GENRE,
avrcp.MediaAttributeId.PLAYING_TIME,
avrcp.MediaAttributeId.DEFAULT_COVER_ART,
],
)
self.send_message(
{
"type": "get-element-attributes-response",
"params": [
{
"attribute_id": attribute.attribute_id.name,
"attribute_value": attribute.attribute_value,
}
for attribute in attributes
],
}
)
def send_message(self, message: dict) -> None:
if self.socket is None:
print("no socket, dropping message")
return
serialized = json.dumps(message)
utils.AsyncRunner.spawn(self.socket.send(serialized))
# -----------------------------------------------------------------------------
class Delegate(avrcp.Delegate):
def __init__(self):
super().__init__(
[avrcp.EventId.VOLUME_CHANGED, avrcp.EventId.PLAYBACK_STATUS_CHANGED]
)
self.websocket_server = None
async def set_absolute_volume(self, volume: int) -> None:
await super().set_absolute_volume(volume)
if self.websocket_server is not None:
self.websocket_server.send_message(
{"type": "set-volume", "params": {"volume": volume}}
)
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 3:
print(
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
'<sbc-file> [<bt-addr>]'
)
print('example: run_avrcp_controller.py classic1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
# Setup the SDP to expose the sink service
device.sdp_service_records = sdp_records()
# Start the controller
await device.power_on()
# Create a listener to wait for AVDTP connections
listener = avdtp.Listener(avdtp.Listener.create_registrar(device))
listener.on('connection', on_avdtp_connection)
avrcp_delegate = Delegate()
avrcp_protocol = avrcp.Protocol(avrcp_delegate)
avrcp_protocol.listen(device)
websocket_server = WebSocketServer(avrcp_protocol, avrcp_delegate)
avrcp_delegate.websocket_server = websocket_server
avrcp_protocol.on(
"start", lambda: on_avrcp_start(avrcp_protocol, websocket_server)
)
await websocket_server.start()
if len(sys.argv) >= 5:
# Connect to the peer
target_address = sys.argv[4]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(
target_address, transport=BT_BR_EDR_TRANSPORT
)
print(f'=== Connected to {connection.peer_address}!')
# Request authentication
print('*** Authenticating...')
await connection.authenticate()
print('*** Authenticated')
# Enable encryption
print('*** Enabling encryption...')
await connection.encrypt()
print('*** Encryption on')
server = await avdtp.Protocol.connect(connection)
listener.set_server(connection, server)
sink = server.add_sink(codec_capabilities())
sink.on('rtp_packet', on_rtp_packet)
await avrcp_protocol.connect(connection)
else:
# Start being discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
await asyncio.get_event_loop().create_future()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -22,10 +22,11 @@ import os
from bumble.device import (
Device,
Connection,
AdvertisingParameters,
AdvertisingEventProperties,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
@@ -61,12 +62,7 @@ async def main() -> None:
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
advertising_set = await devices[0].create_advertising_set()
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC

View File

@@ -98,13 +98,7 @@ async def main() -> None:
)
+ csis.get_advertising_data()
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await device.create_advertising_set(advertising_data=advertising_data)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]

View File

@@ -19,8 +19,13 @@ import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.device import (
AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
)
from bumble.hci import Address
from bumble.transport import open_transport_or_link
@@ -35,20 +40,16 @@ async def main() -> None:
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
advertising_properties = AdvertisingEventProperties.from_advertising_type(
AdvertisingType(int(sys.argv[3]))
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
advertising_properties = AdvertisingEventProperties()
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
peer_address = Address(sys.argv[4])
else:
target = Address.ANY
peer_address = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
@@ -58,8 +59,11 @@ async def main() -> None:
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=advertising_properties,
peer_address=peer_address,
)
)
await hci_transport.source.terminated

View File

@@ -0,0 +1,99 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device
from bumble.hci import Address
from bumble.core import AdvertisingData
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_extended_advertiser_2.py <config-file> <transport-spec>')
print('example: run_extended_advertiser_2.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
if not device.supports_le_extended_advertising:
print("Device does not support extended advertising")
return
print("Max advertising sets:", device.host.number_of_supported_advertising_sets)
print(
"Max advertising data length:", device.host.maximum_advertising_data_length
)
if device.host.number_of_supported_advertising_sets >= 1:
advertising_data1 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))]
)
set1 = await device.create_advertising_set(
advertising_data=bytes(advertising_data1),
)
print("Selected TX power 1:", set1.selected_tx_power)
advertising_data2 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
)
if device.host.number_of_supported_advertising_sets >= 2:
set2 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F1"),
advertising_parameters=AdvertisingParameters(),
advertising_data=bytes(advertising_data2),
auto_start=False,
auto_restart=True,
)
print("Selected TX power 2:", set2.selected_tx_power)
await set2.start()
if device.host.number_of_supported_advertising_sets >= 3:
scan_response_data3 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))]
)
set3 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F2"),
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, is_scannable=True
)
),
scan_response_data=bytes(scan_response_data3),
)
print("Selected TX power 3:", set2.selected_tx_power)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -22,13 +22,12 @@ import os
import struct
import secrets
from bumble.core import AdvertisingData
from bumble.device import Device, CisLink
from bumble.device import Device, CisLink, AdvertisingParameters
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_IsoDataPacket,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
CodecSpecificCapabilities,
@@ -179,11 +178,7 @@ async def main() -> None:
device.once('cis_establishment', on_cis)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_set = await device.create_advertising_set(
advertising_data=advertising_data,
)

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools">
package="com.github.google.bumble.btbench">
<uses-sdk android:minSdkVersion="30" android:targetSdkVersion="34" />
<!-- Request legacy Bluetooth permissions on older devices. -->
<uses-permission android:name="android.permission.BLUETOOTH" android:maxSdkVersion="30" />
<uses-permission android:name="android.permission.BLUETOOTH_ADMIN" android:maxSdkVersion="30" />
@@ -22,11 +22,10 @@
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/Theme.BTBench"
tools:targetApi="31">
>
<activity
android:name=".MainActivity"
android:exported="true"
android:label="@string/app_name"
android:theme="@style/Theme.BTBench">
<intent-filter>
<action android:name="android.intent.action.MAIN" />

View File

@@ -59,6 +59,7 @@ console_scripts =
bumble-ble-rpa-tool = bumble.apps.ble_rpa_tool:main
bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-controller-loopback = bumble.apps.controller_loopback:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
@@ -81,15 +82,15 @@ console_scripts =
build =
build >= 0.7
test =
pytest >= 6.2
pytest-asyncio >= 0.17
pytest >= 8.0
pytest-asyncio == 0.21.1
pytest-html >= 3.2.0
coverage >= 6.4
development =
black == 22.10
grpcio-tools >= 1.57.0
invoke >= 1.7.3
mypy == 1.5.0
mypy == 1.8.0
nox >= 2022
pylint == 2.15.8
pyyaml >= 6.0
@@ -98,7 +99,7 @@ development =
types-protobuf >= 4.21.0
avatar =
pandora-avatar == 0.0.5
rootcanal == 1.3.0 ; python_version>='3.10'
rootcanal == 1.7.0 ; python_version>='3.10'
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6

246
tests/avrcp_test.py Normal file
View File

@@ -0,0 +1,246 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import pytest
from bumble import core
from bumble import device
from bumble import host
from bumble import controller
from bumble import link
from bumble import avc
from bumble import avrcp
from bumble import avctp
from bumble.transport import common
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = link.LocalLink()
self.controllers = [
controller.Controller('C1', link=self.link, public_address=addresses[0]),
controller.Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
device.Device(
address=addresses[0],
host=host.Host(
self.controllers[0], common.AsyncPipeSink(self.controllers[0])
),
),
device.Device(
address=addresses[1],
host=host.Host(
self.controllers[1], common.AsyncPipeSink(self.controllers[1])
),
),
]
self.devices[0].classic_enabled = True
self.devices[1].classic_enabled = True
self.connections = [None, None]
self.protocols = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
async def setup_connections(self):
await self.devices[0].power_on()
await self.devices[1].power_on()
self.connections = await asyncio.gather(
self.devices[0].connect(
self.devices[1].public_address, core.BT_BR_EDR_TRANSPORT
),
self.devices[1].accept(self.devices[0].public_address),
)
self.protocols = [avrcp.Protocol(), avrcp.Protocol()]
self.protocols[0].listen(self.devices[1])
await self.protocols[1].connect(self.connections[0])
# -----------------------------------------------------------------------------
def test_frame_parser():
with pytest.raises(ValueError) as error:
avc.Frame.from_bytes(bytes.fromhex("11480000"))
x = bytes.fromhex("014D0208")
frame = avc.Frame.from_bytes(x)
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 7
assert frame.opcode == 8
x = bytes.fromhex("014DFF0108")
frame = avc.Frame.from_bytes(x)
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 260
assert frame.opcode == 8
x = bytes.fromhex("0148000019581000000103")
frame = avc.Frame.from_bytes(x)
assert isinstance(frame, avc.CommandFrame)
assert frame.ctype == avc.CommandFrame.CommandType.STATUS
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 0
assert frame.opcode == 0
# -----------------------------------------------------------------------------
def test_vendor_dependent_command():
x = bytes.fromhex("0148000019581000000103")
frame = avc.Frame.from_bytes(x)
assert isinstance(frame, avc.VendorDependentCommandFrame)
assert frame.company_id == 0x1958
assert frame.vendor_dependent_data == bytes.fromhex("1000000103")
frame = avc.VendorDependentCommandFrame(
avc.CommandFrame.CommandType.STATUS,
avc.Frame.SubunitType.PANEL,
0,
0x1958,
bytes.fromhex("1000000103"),
)
assert bytes(frame) == x
# -----------------------------------------------------------------------------
def test_avctp_message_assembler():
received_message = []
def on_message(transaction_label, is_response, ipid, pid, payload):
received_message.append((transaction_label, is_response, ipid, pid, payload))
assembler = avctp.MessageAssembler(on_message)
payload = bytes.fromhex("01")
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
received_message = []
payload = bytes.fromhex("010203")
assembler.on_pdu(bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert len(received_message) == 0
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
received_message = []
payload = bytes.fromhex("010203")
assembler.on_pdu(
bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 3, 0x11, 0x22]) + payload[0:1]
)
assembler.on_pdu(
bytes([1 << 4 | 0b10 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[1:2]
)
assembler.on_pdu(
bytes([1 << 4 | 0b11 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[2:3]
)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
# received_message = []
# parameter = bytes.fromhex("010203")
# assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
# assert len(received_message) == 0
# -----------------------------------------------------------------------------
def test_avrcp_pdu_assembler():
received_pdus = []
def on_pdu(pdu_id, parameter):
received_pdus.append((pdu_id, parameter))
assembler = avrcp.PduAssembler(on_pdu)
parameter = bytes.fromhex("01")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, len(parameter)) + parameter)
assert len(received_pdus) == 0
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, 1) + parameter[0:1])
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b10, 1) + parameter[1:2])
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, 1) + parameter[2:3])
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
assert len(received_pdus) == 0
def test_passthrough_commands():
play_pressed = avc.PassThroughCommandFrame(
avc.CommandFrame.CommandType.CONTROL,
avc.CommandFrame.SubunitType.PANEL,
0,
avc.PassThroughCommandFrame.StateFlag.PRESSED,
avc.PassThroughCommandFrame.OperationId.PLAY,
b'',
)
play_pressed_bytes = bytes(play_pressed)
parsed = avc.Frame.from_bytes(play_pressed_bytes)
assert isinstance(parsed, avc.PassThroughCommandFrame)
assert parsed.operation_id == avc.PassThroughCommandFrame.OperationId.PLAY
assert bytes(parsed) == play_pressed_bytes
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_events():
two_devices = TwoDevices()
await two_devices.setup_connections()
supported_events = await two_devices.protocols[0].get_supported_events()
assert supported_events == []
delegate1 = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED])
two_devices.protocols[0].delegate = delegate1
supported_events = await two_devices.protocols[1].get_supported_events()
assert supported_events == [avrcp.EventId.VOLUME_CHANGED]
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_frame_parser()
test_vendor_dependent_command()
test_avctp_message_assembler()
test_avrcp_pdu_assembler()
test_passthrough_commands()
test_get_supported_events()

View File

@@ -28,7 +28,7 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -50,7 +50,8 @@ from bumble.gatt import (
GATT_APPEARANCE_CHARACTERISTIC,
)
from .test_utils import TwoDevices
from .test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
# Logging
@@ -254,12 +255,12 @@ async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
await device.start_advertising()
assert device.is_advertising
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
await device.stop_advertising()
assert not device.is_advertising
# -----------------------------------------------------------------------------
@@ -273,7 +274,7 @@ async def test_legacy_advertising_connection(own_address_type):
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
await device.start_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
@@ -301,7 +302,7 @@ async def test_legacy_advertising_connection(own_address_type):
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
@@ -310,20 +311,18 @@ async def test_legacy_advertising_disconnection(auto_restart):
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_advertising_set_termination(
HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0
)
device.on_disconnection(0x0001, 0)
await async_barrier()
await async_barrier()
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
assert device.is_advertising
else:
device.start_legacy_advertising.assert_not_called()
assert not device.is_advertising
# -----------------------------------------------------------------------------
@@ -332,12 +331,13 @@ async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
advertising_set = await device.create_advertising_set()
assert device.extended_advertising_sets
assert advertising_set.enabled
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
await advertising_set.stop()
assert not advertising_set.enabled
# -----------------------------------------------------------------------------
@@ -349,8 +349,8 @@ async def test_extended_advertising():
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
device.on_connection(
0x0001,
@@ -361,8 +361,9 @@ async def test_extended_advertising_connection(own_address_type):
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
advertising_set.advertising_handle,
0x0001,
0,
)
if own_address_type == OwnAddressType.PUBLIC:
@@ -375,45 +376,6 @@ async def test_extended_advertising_connection(own_address_type):
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():
@@ -423,6 +385,54 @@ async def test_get_remote_le_features():
assert (await devices.connections[0].get_remote_le_features()) is not None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cis():
devices = TwoDevices()
await devices.setup_connection()
peripheral_cis_futures = {}
def on_cis_request(
acl_connection: Connection,
cis_handle: int,
_cig_id: int,
_cis_id: int,
):
acl_connection.abort_on(
'disconnection', devices[1].accept_cis_request(cis_handle)
)
peripheral_cis_futures[cis_handle] = asyncio.get_running_loop().create_future()
devices[1].on('cis_request', on_cis_request)
devices[1].on(
'cis_establishment',
lambda cis_link: peripheral_cis_futures[cis_link.handle].set_result(None),
)
cis_handles = await devices[0].setup_cig(
cig_id=1,
cis_id=[2, 3],
sdu_interval=(0, 0),
framing=0,
max_sdu=(0, 0),
retransmission_number=0,
max_transport_latency=(0, 0),
)
assert len(cis_handles) == 2
cis_links = await devices[0].create_cis(
[
(cis_handles[0], devices.connections[0].handle),
(cis_handles[1], devices.connections[0].handle),
]
)
await asyncio.gather(*peripheral_cis_futures.values())
assert len(cis_links) == 2
await cis_links[0].disconnect()
await cis_links[1].disconnect()
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))

View File

@@ -50,6 +50,7 @@ from bumble.att import (
ATT_Error_Response,
ATT_Read_By_Group_Type_Request,
)
from .test_utils import async_barrier
# -----------------------------------------------------------------------------
@@ -456,13 +457,6 @@ class LinkedDevices:
self.paired = [None, None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():

View File

@@ -23,6 +23,8 @@ from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Address,
CodingFormat,
CodecID,
@@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command(
le_event_mask=bytes.fromhex('0011223344556677')
le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
[
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
]
)
)
assert command.le_event_mask == bytes.fromhex('0100000000010000')
basic_check(command)

View File

@@ -24,11 +24,11 @@ from typing import Tuple
from .test_utils import TwoDevices
from bumble import core
from bumble import device
from bumble import hfp
from bumble import rfcomm
from bumble import hci
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -109,7 +109,7 @@ async def test_sco_setup():
devices[1].accept(devices[0].public_address),
)
def on_sco_request(_connection: device.Connection, _link_type: int):
def on_sco_request(_connection, _link_type: int):
connections[1].abort_on(
'disconnection',
devices[1].send_command(
@@ -124,17 +124,13 @@ async def test_sco_setup():
devices[1].on('sco_request', on_sco_request)
sco_connections = [
sco_connection_futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
devices[0].on(
'sco_connection', lambda sco_link: sco_connections[0].set_result(sco_link)
)
devices[1].on(
'sco_connection', lambda sco_link: sco_connections[1].set_result(sco_link)
)
for device, future in zip(devices, sco_connection_futures):
device.on('sco_connection', future.set_result)
await devices[0].send_command(
hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
@@ -142,8 +138,17 @@ async def test_sco_setup():
**hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(),
)
)
sco_connections = await asyncio.gather(*sco_connection_futures)
await asyncio.gather(*sco_connections)
sco_disconnection_futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
for future, sco_connection in zip(sco_disconnection_futures, sco_connections):
sco_connection.on('disconnection', future.set_result)
await sco_connections[0].disconnect()
await asyncio.gather(*sco_disconnection_futures)
# -----------------------------------------------------------------------------

View File

@@ -547,6 +547,13 @@ async def test_self_smp_over_classic():
MockSmpSession.send_public_key_command.assert_not_called()
MockSmpSession.send_pairing_random_command.assert_not_called()
for i in range(2):
assert (
await two_devices.devices[i].keystore.get(
str(two_devices.connections[i].peer_address)
)
).link_key
# -----------------------------------------------------------------------------
@pytest.mark.asyncio

View File

@@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
from typing import List, Optional
from bumble.controller import Controller
@@ -22,6 +26,7 @@ from bumble.transport import AsyncPipeSink
from bumble.hci import Address
# -----------------------------------------------------------------------------
class TwoDevices:
connections: List[Optional[Connection]]
@@ -75,3 +80,10 @@ class TwoDevices:
def __getitem__(self, index: int) -> Device:
return self.devices[index]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready

View File

@@ -12,15 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import contextlib
import logging
import os
from bumble import utils
from pyee import EventEmitter
from unittest.mock import MagicMock
from pyee import EventEmitter
from bumble import utils
# -----------------------------------------------------------------------------
def test_on() -> None:
emitter = EventEmitter()
with contextlib.closing(utils.EventWatcher()) as context:
@@ -33,6 +38,7 @@ def test_on() -> None:
assert mock.call_count == 1
# -----------------------------------------------------------------------------
def test_on_decorator() -> None:
emitter = EventEmitter()
with contextlib.closing(utils.EventWatcher()) as context:
@@ -48,6 +54,7 @@ def test_on_decorator() -> None:
assert mock.call_count == 1
# -----------------------------------------------------------------------------
def test_multiple_handlers() -> None:
emitter = EventEmitter()
with contextlib.closing(utils.EventWatcher()) as context:
@@ -64,6 +71,30 @@ def test_multiple_handlers() -> None:
mock.assert_called_once_with('b')
# -----------------------------------------------------------------------------
def test_open_int_enums():
class Foo(utils.OpenIntEnum):
FOO = 1
BAR = 2
BLA = 3
x = Foo(1)
assert x.name == "FOO"
assert x.value == 1
assert int(x) == 1
assert x == 1
assert x + 1 == 2
x = Foo(4)
assert x.name == "Foo[4]"
assert x.value == 4
assert int(x) == 4
assert x == 4
assert x + 1 == 5
print(list(Foo))
# -----------------------------------------------------------------------------
def run_tests():
test_on()
@@ -75,3 +106,4 @@ def run_tests():
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
run_tests()
test_open_int_enums()