forked from auracaster/bumble_mirror
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e7f9acb421 | |||
| 976e6cce57 | |||
| dfdf37019c | |||
| 56ca19600b | |||
| cd9feeb455 | |||
| f8e5b88be6 | |||
| 0f71a63b42 | |||
| b7259abe3c | |||
| 00e660d410 | |||
| 88e3a2b87f | |||
| aa658418bc | |||
| ac0cff43b6 | |||
| 8051c23375 | |||
| 7b34bb4050 | |||
| fe38ab35cf | |||
| 65a9102ba1 |
+32
-5
@@ -46,6 +46,12 @@ from bumble.att import (
|
||||
ATT_INSUFFICIENT_AUTHENTICATION_ERROR,
|
||||
ATT_INSUFFICIENT_ENCRYPTION_ERROR,
|
||||
)
|
||||
from bumble.utils import AsyncRunner
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
POST_PAIRING_DELAY = 1
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -235,8 +241,10 @@ def on_connection(connection, request):
|
||||
|
||||
# Listen for pairing events
|
||||
connection.on('pairing_start', on_pairing_start)
|
||||
connection.on('pairing', lambda keys: on_pairing(connection.peer_address, keys))
|
||||
connection.on('pairing_failure', on_pairing_failure)
|
||||
connection.on('pairing', lambda keys: on_pairing(connection, keys))
|
||||
connection.on(
|
||||
'pairing_failure', lambda reason: on_pairing_failure(connection, reason)
|
||||
)
|
||||
|
||||
# Listen for encryption changes
|
||||
connection.on(
|
||||
@@ -270,19 +278,24 @@ def on_pairing_start():
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def on_pairing(address, keys):
|
||||
@AsyncRunner.run_in_task()
|
||||
async def on_pairing(connection, keys):
|
||||
print(color('***-----------------------------------', 'cyan'))
|
||||
print(color(f'*** Paired! (peer identity={address})', 'cyan'))
|
||||
print(color(f'*** Paired! (peer identity={connection.peer_address})', 'cyan'))
|
||||
keys.print(prefix=color('*** ', 'cyan'))
|
||||
print(color('***-----------------------------------', 'cyan'))
|
||||
await asyncio.sleep(POST_PAIRING_DELAY)
|
||||
await connection.disconnect()
|
||||
Waiter.instance.terminate()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def on_pairing_failure(reason):
|
||||
@AsyncRunner.run_in_task()
|
||||
async def on_pairing_failure(connection, reason):
|
||||
print(color('***-----------------------------------', 'red'))
|
||||
print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red'))
|
||||
print(color('***-----------------------------------', 'red'))
|
||||
await connection.disconnect()
|
||||
Waiter.instance.terminate()
|
||||
|
||||
|
||||
@@ -293,6 +306,7 @@ async def pair(
|
||||
mitm,
|
||||
bond,
|
||||
ctkd,
|
||||
identity_address,
|
||||
linger,
|
||||
io,
|
||||
oob,
|
||||
@@ -382,11 +396,18 @@ async def pair(
|
||||
oob_contexts = None
|
||||
|
||||
# Set up a pairing config factory
|
||||
if identity_address == 'public':
|
||||
identity_address_type = PairingConfig.AddressType.PUBLIC
|
||||
elif identity_address == 'random':
|
||||
identity_address_type = PairingConfig.AddressType.RANDOM
|
||||
else:
|
||||
identity_address_type = None
|
||||
device.pairing_config_factory = lambda connection: PairingConfig(
|
||||
sc=sc,
|
||||
mitm=mitm,
|
||||
bonding=bond,
|
||||
oob=oob_contexts,
|
||||
identity_address_type=identity_address_type,
|
||||
delegate=Delegate(mode, connection, io, prompt),
|
||||
)
|
||||
|
||||
@@ -457,6 +478,10 @@ class LogHandler(logging.Handler):
|
||||
help='Enable CTKD',
|
||||
show_default=True,
|
||||
)
|
||||
@click.option(
|
||||
'--identity-address',
|
||||
type=click.Choice(['random', 'public']),
|
||||
)
|
||||
@click.option('--linger', default=False, is_flag=True, help='Linger after pairing')
|
||||
@click.option(
|
||||
'--io',
|
||||
@@ -493,6 +518,7 @@ def main(
|
||||
mitm,
|
||||
bond,
|
||||
ctkd,
|
||||
identity_address,
|
||||
linger,
|
||||
io,
|
||||
oob,
|
||||
@@ -518,6 +544,7 @@ def main(
|
||||
mitm,
|
||||
bond,
|
||||
ctkd,
|
||||
identity_address,
|
||||
linger,
|
||||
io,
|
||||
oob,
|
||||
|
||||
+55
-39
@@ -23,6 +23,7 @@
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import functools
|
||||
import inspect
|
||||
@@ -41,6 +42,7 @@ from typing import (
|
||||
|
||||
from pyee import EventEmitter
|
||||
|
||||
from bumble import utils
|
||||
from bumble.core import UUID, name_or_number, ProtocolError
|
||||
from bumble.hci import HCI_Object, key_with_value
|
||||
from bumble.colors import color
|
||||
@@ -145,43 +147,57 @@ ATT_RESPONSES = [
|
||||
ATT_EXECUTE_WRITE_RESPONSE
|
||||
]
|
||||
|
||||
ATT_INVALID_HANDLE_ERROR = 0x01
|
||||
ATT_READ_NOT_PERMITTED_ERROR = 0x02
|
||||
ATT_WRITE_NOT_PERMITTED_ERROR = 0x03
|
||||
ATT_INVALID_PDU_ERROR = 0x04
|
||||
ATT_INSUFFICIENT_AUTHENTICATION_ERROR = 0x05
|
||||
ATT_REQUEST_NOT_SUPPORTED_ERROR = 0x06
|
||||
ATT_INVALID_OFFSET_ERROR = 0x07
|
||||
ATT_INSUFFICIENT_AUTHORIZATION_ERROR = 0x08
|
||||
ATT_PREPARE_QUEUE_FULL_ERROR = 0x09
|
||||
ATT_ATTRIBUTE_NOT_FOUND_ERROR = 0x0A
|
||||
ATT_ATTRIBUTE_NOT_LONG_ERROR = 0x0B
|
||||
ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C
|
||||
ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = 0x0D
|
||||
ATT_UNLIKELY_ERROR_ERROR = 0x0E
|
||||
ATT_INSUFFICIENT_ENCRYPTION_ERROR = 0x0F
|
||||
ATT_UNSUPPORTED_GROUP_TYPE_ERROR = 0x10
|
||||
ATT_INSUFFICIENT_RESOURCES_ERROR = 0x11
|
||||
class ErrorCode(utils.OpenIntEnum):
|
||||
'''
|
||||
See
|
||||
|
||||
ATT_ERROR_NAMES = {
|
||||
ATT_INVALID_HANDLE_ERROR: 'ATT_INVALID_HANDLE_ERROR',
|
||||
ATT_READ_NOT_PERMITTED_ERROR: 'ATT_READ_NOT_PERMITTED_ERROR',
|
||||
ATT_WRITE_NOT_PERMITTED_ERROR: 'ATT_WRITE_NOT_PERMITTED_ERROR',
|
||||
ATT_INVALID_PDU_ERROR: 'ATT_INVALID_PDU_ERROR',
|
||||
ATT_INSUFFICIENT_AUTHENTICATION_ERROR: 'ATT_INSUFFICIENT_AUTHENTICATION_ERROR',
|
||||
ATT_REQUEST_NOT_SUPPORTED_ERROR: 'ATT_REQUEST_NOT_SUPPORTED_ERROR',
|
||||
ATT_INVALID_OFFSET_ERROR: 'ATT_INVALID_OFFSET_ERROR',
|
||||
ATT_INSUFFICIENT_AUTHORIZATION_ERROR: 'ATT_INSUFFICIENT_AUTHORIZATION_ERROR',
|
||||
ATT_PREPARE_QUEUE_FULL_ERROR: 'ATT_PREPARE_QUEUE_FULL_ERROR',
|
||||
ATT_ATTRIBUTE_NOT_FOUND_ERROR: 'ATT_ATTRIBUTE_NOT_FOUND_ERROR',
|
||||
ATT_ATTRIBUTE_NOT_LONG_ERROR: 'ATT_ATTRIBUTE_NOT_LONG_ERROR',
|
||||
ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR',
|
||||
ATT_INVALID_ATTRIBUTE_LENGTH_ERROR: 'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR',
|
||||
ATT_UNLIKELY_ERROR_ERROR: 'ATT_UNLIKELY_ERROR_ERROR',
|
||||
ATT_INSUFFICIENT_ENCRYPTION_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_ERROR',
|
||||
ATT_UNSUPPORTED_GROUP_TYPE_ERROR: 'ATT_UNSUPPORTED_GROUP_TYPE_ERROR',
|
||||
ATT_INSUFFICIENT_RESOURCES_ERROR: 'ATT_INSUFFICIENT_RESOURCES_ERROR'
|
||||
}
|
||||
* Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
|
||||
* Core Specification Supplement: Common Profile And Service Error Codes
|
||||
'''
|
||||
INVALID_HANDLE = 0x01
|
||||
READ_NOT_PERMITTED = 0x02
|
||||
WRITE_NOT_PERMITTED = 0x03
|
||||
INVALID_PDU = 0x04
|
||||
INSUFFICIENT_AUTHENTICATION = 0x05
|
||||
REQUEST_NOT_SUPPORTED = 0x06
|
||||
INVALID_OFFSET = 0x07
|
||||
INSUFFICIENT_AUTHORIZATION = 0x08
|
||||
PREPARE_QUEUE_FULL = 0x09
|
||||
ATTRIBUTE_NOT_FOUND = 0x0A
|
||||
ATTRIBUTE_NOT_LONG = 0x0B
|
||||
INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0C
|
||||
INVALID_ATTRIBUTE_LENGTH = 0x0D
|
||||
UNLIKELY_ERROR = 0x0E
|
||||
INSUFFICIENT_ENCRYPTION = 0x0F
|
||||
UNSUPPORTED_GROUP_TYPE = 0x10
|
||||
INSUFFICIENT_RESOURCES = 0x11
|
||||
DATABASE_OUT_OF_SYNC = 0x12
|
||||
VALUE_NOT_ALLOWED = 0x13
|
||||
# 0x80 – 0x9F: Application Error
|
||||
# 0xE0 – 0xFF: Common Profile and Service Error Codes
|
||||
WRITE_REQUEST_REJECTED = 0xFC
|
||||
CCCD_IMPROPERLY_CONFIGURED = 0xFD
|
||||
PROCEDURE_ALREADY_IN_PROGRESS = 0xFE
|
||||
OUT_OF_RANGE = 0xFF
|
||||
|
||||
# Backward Compatible Constants
|
||||
ATT_INVALID_HANDLE_ERROR = ErrorCode.INVALID_HANDLE
|
||||
ATT_READ_NOT_PERMITTED_ERROR = ErrorCode.READ_NOT_PERMITTED
|
||||
ATT_WRITE_NOT_PERMITTED_ERROR = ErrorCode.WRITE_NOT_PERMITTED
|
||||
ATT_INVALID_PDU_ERROR = ErrorCode.INVALID_PDU
|
||||
ATT_INSUFFICIENT_AUTHENTICATION_ERROR = ErrorCode.INSUFFICIENT_AUTHENTICATION
|
||||
ATT_REQUEST_NOT_SUPPORTED_ERROR = ErrorCode.REQUEST_NOT_SUPPORTED
|
||||
ATT_INVALID_OFFSET_ERROR = ErrorCode.INVALID_OFFSET
|
||||
ATT_INSUFFICIENT_AUTHORIZATION_ERROR = ErrorCode.INSUFFICIENT_AUTHORIZATION
|
||||
ATT_PREPARE_QUEUE_FULL_ERROR = ErrorCode.PREPARE_QUEUE_FULL
|
||||
ATT_ATTRIBUTE_NOT_FOUND_ERROR = ErrorCode.ATTRIBUTE_NOT_FOUND
|
||||
ATT_ATTRIBUTE_NOT_LONG_ERROR = ErrorCode.ATTRIBUTE_NOT_LONG
|
||||
ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION_KEY_SIZE
|
||||
ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = ErrorCode.INVALID_ATTRIBUTE_LENGTH
|
||||
ATT_UNLIKELY_ERROR_ERROR = ErrorCode.UNLIKELY_ERROR
|
||||
ATT_INSUFFICIENT_ENCRYPTION_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION
|
||||
ATT_UNSUPPORTED_GROUP_TYPE_ERROR = ErrorCode.UNSUPPORTED_GROUP_TYPE
|
||||
ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES
|
||||
|
||||
ATT_DEFAULT_MTU = 23
|
||||
|
||||
@@ -245,9 +261,9 @@ class ATT_PDU:
|
||||
def pdu_name(op_code):
|
||||
return name_or_number(ATT_PDU_NAMES, op_code, 2)
|
||||
|
||||
@staticmethod
|
||||
def error_name(error_code):
|
||||
return name_or_number(ATT_ERROR_NAMES, error_code, 2)
|
||||
@classmethod
|
||||
def error_name(cls, error_code: int) -> str:
|
||||
return ErrorCode(error_code).name
|
||||
|
||||
@staticmethod
|
||||
def subclass(fields):
|
||||
|
||||
+19
-5
@@ -301,6 +301,8 @@ class Driver(common.Driver):
|
||||
fw_name: str = ""
|
||||
config_name: str = ""
|
||||
|
||||
POST_RESET_DELAY: float = 0.2
|
||||
|
||||
DRIVER_INFOS = [
|
||||
# 8723A
|
||||
DriverInfo(
|
||||
@@ -495,12 +497,24 @@ class Driver(common.Driver):
|
||||
|
||||
@classmethod
|
||||
async def driver_info_for_host(cls, host):
|
||||
await host.send_command(HCI_Reset_Command(), check_result=True)
|
||||
host.ready = True # Needed to let the host know the controller is ready.
|
||||
try:
|
||||
await host.send_command(
|
||||
HCI_Reset_Command(),
|
||||
check_result=True,
|
||||
response_timeout=cls.POST_RESET_DELAY,
|
||||
)
|
||||
host.ready = True # Needed to let the host know the controller is ready.
|
||||
except asyncio.exceptions.TimeoutError:
|
||||
logger.warning("timeout waiting for hci reset, retrying")
|
||||
await host.send_command(HCI_Reset_Command(), check_result=True)
|
||||
host.ready = True
|
||||
|
||||
command = HCI_Read_Local_Version_Information_Command()
|
||||
response = await host.send_command(command, check_result=True)
|
||||
if response.command_opcode != command.op_code:
|
||||
logger.error("failed to probe local version information")
|
||||
return None
|
||||
|
||||
response = await host.send_command(
|
||||
HCI_Read_Local_Version_Information_Command(), check_result=True
|
||||
)
|
||||
local_version = response.return_parameters
|
||||
|
||||
logger.debug(
|
||||
|
||||
+21
-16
@@ -238,22 +238,22 @@ GATT_SEARCH_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x
|
||||
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Content Control Id')
|
||||
|
||||
# Telephone Bearer Service (TBS)
|
||||
GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BB4, 'Bearer Provider Name')
|
||||
GATT_BEARER_UCI_CHARACTERISTIC = UUID.from_16_bits(0x2BB5, 'Bearer UCI')
|
||||
GATT_BEARER_TECHNOLOGY_CHARACTERISTIC = UUID.from_16_bits(0x2BB6, 'Bearer Technology')
|
||||
GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2BB7, 'Bearer URI Schemes Supported List')
|
||||
GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength')
|
||||
GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer Signal Strength Reporting Interval')
|
||||
GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Bearer List Current Calls')
|
||||
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBB, 'Content Control ID')
|
||||
GATT_STATUS_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2BBC, 'Status Flags')
|
||||
GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC = UUID.from_16_bits(0x2BBD, 'Incoming Call Target Bearer URI')
|
||||
GATT_CALL_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BBE, 'Call State')
|
||||
GATT_CALL_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BBF, 'Call Control Point')
|
||||
GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC = UUID.from_16_bits(0x2BC0, 'Call Control Point Optional Opcodes')
|
||||
GATT_TERMINATION_REASON_CHARACTERISTIC = UUID.from_16_bits(0x2BC1, 'Termination Reason')
|
||||
GATT_INCOMING_CALL_CHARACTERISTIC = UUID.from_16_bits(0x2BC2, 'Incoming Call')
|
||||
GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Call Friendly Name')
|
||||
GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BB3, 'Bearer Provider Name')
|
||||
GATT_BEARER_UCI_CHARACTERISTIC = UUID.from_16_bits(0x2BB4, 'Bearer UCI')
|
||||
GATT_BEARER_TECHNOLOGY_CHARACTERISTIC = UUID.from_16_bits(0x2BB5, 'Bearer Technology')
|
||||
GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2BB6, 'Bearer URI Schemes Supported List')
|
||||
GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC = UUID.from_16_bits(0x2BB7, 'Bearer Signal Strength')
|
||||
GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength Reporting Interval')
|
||||
GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer List Current Calls')
|
||||
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Content Control ID')
|
||||
GATT_STATUS_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2BBB, 'Status Flags')
|
||||
GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC = UUID.from_16_bits(0x2BBC, 'Incoming Call Target Bearer URI')
|
||||
GATT_CALL_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BBD, 'Call State')
|
||||
GATT_CALL_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BBE, 'Call Control Point')
|
||||
GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC = UUID.from_16_bits(0x2BBF, 'Call Control Point Optional Opcodes')
|
||||
GATT_TERMINATION_REASON_CHARACTERISTIC = UUID.from_16_bits(0x2BC0, 'Termination Reason')
|
||||
GATT_INCOMING_CALL_CHARACTERISTIC = UUID.from_16_bits(0x2BC1, 'Incoming Call')
|
||||
GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BC2, 'Call Friendly Name')
|
||||
|
||||
# Microphone Control Service (MICS)
|
||||
GATT_MUTE_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Mute')
|
||||
@@ -275,6 +275,11 @@ GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Sou
|
||||
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
|
||||
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
|
||||
|
||||
# Hearing Access Service
|
||||
GATT_HEARING_AID_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2BDA, 'Hearing Aid Features')
|
||||
GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BDB, 'Hearing Aid Preset Control Point')
|
||||
GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC = UUID.from_16_bits(0x2BDC, 'Active Preset Index')
|
||||
|
||||
# ASHA Service
|
||||
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
|
||||
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
|
||||
|
||||
+5
-25
@@ -68,7 +68,7 @@ from .att import (
|
||||
ATT_Error,
|
||||
)
|
||||
from . import core
|
||||
from .core import UUID, InvalidStateError, ProtocolError
|
||||
from .core import UUID, InvalidStateError
|
||||
from .gatt import (
|
||||
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
|
||||
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
|
||||
@@ -345,12 +345,7 @@ class Client:
|
||||
self.mtu_exchange_done = True
|
||||
response = await self.send_request(ATT_Exchange_MTU_Request(client_rx_mtu=mtu))
|
||||
if response.op_code == ATT_ERROR_RESPONSE:
|
||||
raise ProtocolError(
|
||||
response.error_code,
|
||||
'att',
|
||||
ATT_PDU.error_name(response.error_code),
|
||||
response,
|
||||
)
|
||||
raise ATT_Error(error_code=response.error_code, message=response)
|
||||
|
||||
# Compute the final MTU
|
||||
self.connection.att_mtu = min(mtu, response.server_rx_mtu)
|
||||
@@ -936,12 +931,7 @@ class Client:
|
||||
if response is None:
|
||||
raise TimeoutError('read timeout')
|
||||
if response.op_code == ATT_ERROR_RESPONSE:
|
||||
raise ProtocolError(
|
||||
response.error_code,
|
||||
'att',
|
||||
ATT_PDU.error_name(response.error_code),
|
||||
response,
|
||||
)
|
||||
raise ATT_Error(error_code=response.error_code, message=response)
|
||||
|
||||
# If the value is the max size for the MTU, try to read more unless the caller
|
||||
# specifically asked not to do that
|
||||
@@ -963,12 +953,7 @@ class Client:
|
||||
ATT_INVALID_OFFSET_ERROR,
|
||||
):
|
||||
break
|
||||
raise ProtocolError(
|
||||
response.error_code,
|
||||
'att',
|
||||
ATT_PDU.error_name(response.error_code),
|
||||
response,
|
||||
)
|
||||
raise ATT_Error(error_code=response.error_code, message=response)
|
||||
|
||||
part = response.part_attribute_value
|
||||
attribute_value += part
|
||||
@@ -1061,12 +1046,7 @@ class Client:
|
||||
)
|
||||
)
|
||||
if response.op_code == ATT_ERROR_RESPONSE:
|
||||
raise ProtocolError(
|
||||
response.error_code,
|
||||
'att',
|
||||
ATT_PDU.error_name(response.error_code),
|
||||
response,
|
||||
)
|
||||
raise ATT_Error(error_code=response.error_code, message=response)
|
||||
else:
|
||||
await self.send_command(
|
||||
ATT_Write_Command(
|
||||
|
||||
+14
-6
@@ -915,7 +915,7 @@ class Server(EventEmitter):
|
||||
See Bluetooth spec Vol 3, Part F - 3.4.5.1 Write Request
|
||||
'''
|
||||
|
||||
# Check that the attribute exists
|
||||
# Check that the attribute exists
|
||||
attribute = self.get_attribute(request.attribute_handle)
|
||||
if attribute is None:
|
||||
self.send_response(
|
||||
@@ -942,11 +942,19 @@ class Server(EventEmitter):
|
||||
)
|
||||
return
|
||||
|
||||
# Accept the value
|
||||
await attribute.write_value(connection, request.attribute_value)
|
||||
|
||||
# Done
|
||||
self.send_response(connection, ATT_Write_Response())
|
||||
try:
|
||||
# Accept the value
|
||||
await attribute.write_value(connection, request.attribute_value)
|
||||
except ATT_Error as error:
|
||||
response = ATT_Error_Response(
|
||||
request_opcode_in_error=request.op_code,
|
||||
attribute_handle_in_error=request.attribute_handle,
|
||||
error_code=error.error_code,
|
||||
)
|
||||
else:
|
||||
# Done
|
||||
response = ATT_Write_Response()
|
||||
self.send_response(connection, response)
|
||||
|
||||
@AsyncRunner.run_in_task()
|
||||
async def on_att_write_command(self, connection, request):
|
||||
|
||||
+272
-7
@@ -267,6 +267,19 @@ HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26
|
||||
HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27
|
||||
HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28
|
||||
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29
|
||||
HCI_LE_READ_ALL_REMOTE_FEATURES_COMPLETE_EVENT = 0x2A
|
||||
HCI_LE_CIS_ESTABLISHED_V2_EVENT = 0x2B
|
||||
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT = 0x2C
|
||||
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMPLETE_EVENT = 0x2D
|
||||
HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT = 0x2E
|
||||
HCI_LE_CS_CONFIG_COMPLETE_EVENT = 0x2F
|
||||
HCI_LE_CS_PROCEDURE_ENABLE_EVENT = 0x30
|
||||
HCI_LE_CS_SUBEVENT_RESULT_EVENT = 0x31
|
||||
HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT = 0x32
|
||||
HCI_LE_CS_TEST_END_COMPLETE_EVENT = 0x33
|
||||
HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT = 0x34
|
||||
HCI_LE_FRAME_SPACE_UPDATE_EVENT = 0x35
|
||||
|
||||
|
||||
|
||||
# HCI Command
|
||||
@@ -573,11 +586,36 @@ HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_c
|
||||
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
|
||||
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
|
||||
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x007F)
|
||||
HCI_LE_SET_DECISION_DATA_COMMAND = hci_command_op_code(0x08, 0x0080)
|
||||
HCI_LE_SET_DECISION_INSTRUCTIONS_COMMAND = hci_command_op_code(0x08, 0x0081)
|
||||
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND = hci_command_op_code(0x08, 0x0082)
|
||||
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND = hci_command_op_code(0x08, 0x0083)
|
||||
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND = hci_command_op_code(0x08, 0x0084)
|
||||
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND = hci_command_op_code(0x08, 0x0085)
|
||||
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x0086)
|
||||
HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND = hci_command_op_code(0x08, 0x0087)
|
||||
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND = hci_command_op_code(0x08, 0x0088)
|
||||
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND = hci_command_op_code(0x08, 0x0089)
|
||||
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND = hci_command_op_code(0x08, 0x008A)
|
||||
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES = hci_command_op_code(0x08, 0x008B)
|
||||
HCI_LE_CS_SECURITY_ENABLE_COMMAND = hci_command_op_code(0x08, 0x008C)
|
||||
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND = hci_command_op_code(0x08, 0x008D)
|
||||
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND = hci_command_op_code(0x08, 0x008E)
|
||||
HCI_LE_CS_WRITE_CACHED_REMOTE_FAE_TABLE_COMMAND = hci_command_op_code(0x08, 0x008F)
|
||||
HCI_LE_CS_CREATE_CONFIG_COMMAND = hci_command_op_code(0x08, 0x0090)
|
||||
HCI_LE_CS_REMOVE_CONFIG_COMMAND = hci_command_op_code(0x08, 0x0091)
|
||||
HCI_LE_CS_SET_CHANNEL_CLASSIFICATION_COMMAND = hci_command_op_code(0x08, 0x0092)
|
||||
HCI_LE_CS_SET_PROCEDURE_PARAMETERS_COMMAND = hci_command_op_code(0x08, 0x0093)
|
||||
HCI_LE_CS_PROCEDURE_ENABLE_COMMAND = hci_command_op_code(0x08, 0x0094)
|
||||
HCI_LE_CS_TEST_COMMAND = hci_command_op_code(0x08, 0x0095)
|
||||
HCI_LE_CS_TEST_END_COMMAND = hci_command_op_code(0x08, 0x0096)
|
||||
HCI_LE_SET_HOST_FEATURE_V2_COMMAND = hci_command_op_code(0x08, 0x0097)
|
||||
HCI_LE_ADD_DEVICE_TO_MONITORED_ADVERTISERS_LIST_COMMAND = hci_command_op_code(0x08, 0x0098)
|
||||
HCI_LE_REMOVE_DEVICE_FROM_MONITORED_ADVERTISERS_LIST_COMMAND = hci_command_op_code(0x08, 0x0099)
|
||||
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND = hci_command_op_code(0x08, 0x009A)
|
||||
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND = hci_command_op_code(0x08, 0x009B)
|
||||
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND = hci_command_op_code(0x08, 0x009C)
|
||||
HCI_LE_FRAME_SPACE_UPDATE_COMMAND = hci_command_op_code(0x08, 0x009D)
|
||||
|
||||
|
||||
# HCI Error Codes
|
||||
@@ -1150,8 +1188,16 @@ class LeFeature(OpenIntEnum):
|
||||
CHANNEL_CLASSIFICATION = 39
|
||||
ADVERTISING_CODING_SELECTION = 40
|
||||
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 41
|
||||
DECISION_BASED_ADVERTISING_FILTERING = 42
|
||||
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 43
|
||||
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 44
|
||||
UNSEGMENTED_FRAMED_MODE = 45
|
||||
CHANNEL_SOUNDING = 46
|
||||
CHANNEL_SOUNDING_HOST_SUPPORT = 47
|
||||
CHANNEL_SOUNDING_TONE_QUALITY_INDICATION = 48
|
||||
LL_EXTENDED_FEATURE_SET = 63
|
||||
MONITORING_ADVERTISERS = 64
|
||||
FRAME_SPACE_UPDATE = 65
|
||||
|
||||
class LeFeatureMask(enum.IntFlag):
|
||||
LE_ENCRYPTION = 1 << LeFeature.LE_ENCRYPTION
|
||||
@@ -1196,8 +1242,16 @@ class LeFeatureMask(enum.IntFlag):
|
||||
CHANNEL_CLASSIFICATION = 1 << LeFeature.CHANNEL_CLASSIFICATION
|
||||
ADVERTISING_CODING_SELECTION = 1 << LeFeature.ADVERTISING_CODING_SELECTION
|
||||
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << LeFeature.ADVERTISING_CODING_SELECTION_HOST_SUPPORT
|
||||
DECISION_BASED_ADVERTISING_FILTERING = 1 << LeFeature.DECISION_BASED_ADVERTISING_FILTERING
|
||||
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << LeFeature.PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER
|
||||
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << LeFeature.PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER
|
||||
UNSEGMENTED_FRAMED_MODE = 1 << LeFeature.UNSEGMENTED_FRAMED_MODE
|
||||
CHANNEL_SOUNDING = 1 << LeFeature.CHANNEL_SOUNDING
|
||||
CHANNEL_SOUNDING_HOST_SUPPORT = 1 << LeFeature.CHANNEL_SOUNDING_HOST_SUPPORT
|
||||
CHANNEL_SOUNDING_TONE_QUALITY_INDICATION = 1 << LeFeature.CHANNEL_SOUNDING_TONE_QUALITY_INDICATION
|
||||
LL_EXTENDED_FEATURE_SET = 1 << LeFeature.LL_EXTENDED_FEATURE_SET
|
||||
MONITORING_ADVERTISERS = 1 << LeFeature.MONITORING_ADVERTISERS
|
||||
FRAME_SPACE_UPDATE = 1 << LeFeature.FRAME_SPACE_UPDATE
|
||||
|
||||
class LmpFeature(enum.IntEnum):
|
||||
# Page 0 (Legacy LMP features)
|
||||
@@ -1565,12 +1619,16 @@ class HCI_Object:
|
||||
# This is an array field, starting with a 1-byte item count.
|
||||
item_count = data[offset]
|
||||
offset += 1
|
||||
# Set fields first, because item_count might be 0.
|
||||
for sub_field_name, _ in field:
|
||||
result[sub_field_name] = []
|
||||
|
||||
for _ in range(item_count):
|
||||
for sub_field_name, sub_field_type in field:
|
||||
value, size = HCI_Object.parse_field(
|
||||
data, offset, sub_field_type
|
||||
)
|
||||
result.setdefault(sub_field_name, []).append(value)
|
||||
result[sub_field_name].append(value)
|
||||
offset += size
|
||||
continue
|
||||
|
||||
@@ -2982,6 +3040,27 @@ class HCI_Write_Inquiry_Scan_Activity_Command(HCI_Command):
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('authentication_enable', 1),
|
||||
]
|
||||
)
|
||||
class HCI_Read_Authentication_Enable_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.3.23 Read Authentication Enable Command
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command([('authentication_enable', 1)])
|
||||
class HCI_Write_Authentication_Enable_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.3.24 Write Authentication Enable Command
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
@@ -3022,7 +3101,12 @@ class HCI_Write_Voice_Setting_Command(HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command()
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('synchronous_flow_control_enable', 1),
|
||||
]
|
||||
)
|
||||
class HCI_Read_Synchronous_Flow_Control_Enable_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.3.36 Read Synchronous Flow Control Enable Command
|
||||
@@ -3191,7 +3275,13 @@ class HCI_Set_Event_Mask_Page_2_Command(HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command()
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('le_supported_host', 1),
|
||||
('unused', 1),
|
||||
]
|
||||
)
|
||||
class HCI_Read_LE_Host_Support_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.3.78 Read LE Host Support Command
|
||||
@@ -3324,13 +3414,39 @@ class HCI_Read_BD_ADDR_Command(HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command()
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
("status", STATUS_SPEC),
|
||||
[("standard_codec_ids", 1)],
|
||||
[("vendor_specific_codec_ids", 4)],
|
||||
]
|
||||
)
|
||||
class HCI_Read_Local_Supported_Codecs_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.4.8 Read Local Supported Codecs Command
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
("status", STATUS_SPEC),
|
||||
[("standard_codec_ids", 1), ("standard_codec_transports", 1)],
|
||||
[("vendor_specific_codec_ids", 4), ("vendor_specific_codec_transports", 1)],
|
||||
]
|
||||
)
|
||||
class HCI_Read_Local_Supported_Codecs_V2_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.4.8 Read Local Supported Codecs Command
|
||||
'''
|
||||
|
||||
class Transport(OpenIntEnum):
|
||||
BR_EDR_ACL = 0x00
|
||||
BR_EDR_SCO = 0x01
|
||||
LE_CIS = 0x02
|
||||
LE_BIS = 0x03
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
fields=[('handle', 2)],
|
||||
@@ -3488,7 +3604,12 @@ class HCI_LE_Set_Advertising_Parameters_Command(HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command()
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('tx_power_level', 1),
|
||||
]
|
||||
)
|
||||
class HCI_LE_Read_Advertising_Physical_Channel_Tx_Power_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.6 LE Read Advertising Physical Channel Tx Power Command
|
||||
@@ -3612,7 +3733,12 @@ class HCI_LE_Create_Connection_Cancel_Command(HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command()
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('filter_accept_list_size', 1),
|
||||
]
|
||||
)
|
||||
class HCI_LE_Read_Filter_Accept_List_Size_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.14 LE Read Filter Accept List Size Command
|
||||
@@ -3723,7 +3849,12 @@ class HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command()
|
||||
@HCI_Command.command(
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('le_states', 8),
|
||||
]
|
||||
)
|
||||
class HCI_LE_Read_Supported_States_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.27 LE Read Supported States Command
|
||||
@@ -4698,6 +4829,102 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
|
||||
reason: int
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
fields=[
|
||||
('big_handle', 1),
|
||||
('advertising_handle', 1),
|
||||
('num_bis', 1),
|
||||
('sdu_interval', 3),
|
||||
('max_sdu', 2),
|
||||
('max_transport_latency', 2),
|
||||
('rtn', 1),
|
||||
('phy', 1),
|
||||
('packing', 1),
|
||||
('framing', 1),
|
||||
('encryption', 1),
|
||||
('broadcast_code', 16),
|
||||
],
|
||||
)
|
||||
class HCI_LE_Create_BIG_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.103 LE Create BIG command
|
||||
'''
|
||||
|
||||
big_handle: int
|
||||
advertising_handle: int
|
||||
num_bis: int
|
||||
sdu_interval: int
|
||||
max_sdu: int
|
||||
max_transport_latency: int
|
||||
rtn: int
|
||||
phy: int
|
||||
packing: int
|
||||
framing: int
|
||||
encryption: int
|
||||
broadcast_code: int
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
fields=[
|
||||
('big_handle', 1),
|
||||
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
|
||||
],
|
||||
)
|
||||
class HCI_LE_Terminate_BIG_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.105 LE Terminate BIG command
|
||||
'''
|
||||
|
||||
big_handle: int
|
||||
reason: int
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
fields=[
|
||||
('big_handle', 1),
|
||||
('sync_handle', 2),
|
||||
('encryption', 1),
|
||||
('broadcast_code', 16),
|
||||
('mse', 1),
|
||||
('big_sync_timeout', 2),
|
||||
[('bis', 1)],
|
||||
],
|
||||
)
|
||||
class HCI_LE_BIG_Create_Sync_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.106 LE BIG Create Sync command
|
||||
'''
|
||||
|
||||
big_handle: int
|
||||
sync_handle: int
|
||||
encryption: int
|
||||
broadcast_code: int
|
||||
mse: int
|
||||
big_sync_timeout: int
|
||||
bis: List[int]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
fields=[
|
||||
('big_handle', 1),
|
||||
],
|
||||
return_parameters_fields=[
|
||||
('status', STATUS_SPEC),
|
||||
('big_handle', 2),
|
||||
],
|
||||
)
|
||||
class HCI_LE_BIG_Terminate_Sync_Command(HCI_Command):
|
||||
'''
|
||||
See Bluetooth spec @ 7.8.107. LE BIG Terminate Sync command
|
||||
'''
|
||||
|
||||
big_handle: int
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Command.command(
|
||||
fields=[
|
||||
@@ -5533,6 +5760,27 @@ class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event):
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_LE_Meta_Event.event(
|
||||
[
|
||||
('status', STATUS_SPEC),
|
||||
('connection_handle', 2),
|
||||
('service_data', 2),
|
||||
('sync_handle', 2),
|
||||
('advertising_sid', 1),
|
||||
('advertiser_address_type', Address.ADDRESS_TYPE_SPEC),
|
||||
('advertiser_address', Address.parse_address_preceded_by_type),
|
||||
('advertiser_phy', 1),
|
||||
('periodic_advertising_interval', 2),
|
||||
('advertiser_clock_accuracy', 1),
|
||||
]
|
||||
)
|
||||
class HCI_LE_Periodic_Advertising_Sync_Transfer_Received_Event(HCI_LE_Meta_Event):
|
||||
'''
|
||||
See Bluetooth spec @ 7.7.65.24 LE Periodic Advertising Sync Transfer Received Event
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_LE_Meta_Event.event(
|
||||
[
|
||||
@@ -6225,6 +6473,23 @@ class HCI_Synchronous_Connection_Changed_Event(HCI_Event):
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Event.event(
|
||||
[
|
||||
('status', STATUS_SPEC),
|
||||
('connection_handle', 2),
|
||||
('max_tx_latency', 2),
|
||||
('max_rx_latency', 2),
|
||||
('min_remote_timeout', 2),
|
||||
('min_local_timeout', 2),
|
||||
]
|
||||
)
|
||||
class HCI_Sniff_Subrating_Event(HCI_Event):
|
||||
'''
|
||||
See Bluetooth spec @ 7.7.37 Sniff Subrating Event
|
||||
'''
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@HCI_Event.event(
|
||||
[
|
||||
|
||||
+16
-6
@@ -171,7 +171,7 @@ class Host(AbortableEventEmitter):
|
||||
self.cis_links = {} # CIS links, by connection handle
|
||||
self.sco_links = {} # SCO links, by connection handle
|
||||
self.pending_command = None
|
||||
self.pending_response = None
|
||||
self.pending_response: Optional[asyncio.Future[Any]] = None
|
||||
self.number_of_supported_advertising_sets = 0
|
||||
self.maximum_advertising_data_length = 31
|
||||
self.local_version = None
|
||||
@@ -514,7 +514,9 @@ class Host(AbortableEventEmitter):
|
||||
if self.hci_sink:
|
||||
self.hci_sink.on_packet(bytes(packet))
|
||||
|
||||
async def send_command(self, command, check_result=False):
|
||||
async def send_command(
|
||||
self, command, check_result=False, response_timeout: Optional[int] = None
|
||||
):
|
||||
# Wait until we can send (only one pending command at a time)
|
||||
async with self.command_semaphore:
|
||||
assert self.pending_command is None
|
||||
@@ -526,12 +528,13 @@ class Host(AbortableEventEmitter):
|
||||
|
||||
try:
|
||||
self.send_hci_packet(command)
|
||||
response = await self.pending_response
|
||||
await asyncio.wait_for(self.pending_response, timeout=response_timeout)
|
||||
response = self.pending_response.result()
|
||||
|
||||
# Check the return parameters if required
|
||||
if check_result:
|
||||
if isinstance(response, hci.HCI_Command_Status_Event):
|
||||
status = response.status
|
||||
status = response.status # type: ignore[attr-defined]
|
||||
elif isinstance(response.return_parameters, int):
|
||||
status = response.return_parameters
|
||||
elif isinstance(response.return_parameters, bytes):
|
||||
@@ -625,14 +628,21 @@ class Host(AbortableEventEmitter):
|
||||
|
||||
# Packet Sink protocol (packets coming from the controller via HCI)
|
||||
def on_packet(self, packet: bytes) -> None:
|
||||
hci_packet = hci.HCI_Packet.from_bytes(packet)
|
||||
try:
|
||||
hci_packet = hci.HCI_Packet.from_bytes(packet)
|
||||
except Exception as error:
|
||||
logger.warning(f'!!! error parsing packet from bytes: {error}')
|
||||
return
|
||||
|
||||
if self.ready or (
|
||||
isinstance(hci_packet, hci.HCI_Command_Complete_Event)
|
||||
and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
|
||||
):
|
||||
self.on_hci_packet(hci_packet)
|
||||
else:
|
||||
logger.debug('reset not done, ignoring packet from controller')
|
||||
logger.debug(
|
||||
f'reset not done, ignoring packet from controller: {hci_packet}'
|
||||
)
|
||||
|
||||
def on_transport_lost(self):
|
||||
# Called by the source when the transport has been lost.
|
||||
|
||||
@@ -0,0 +1,519 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""LE Audio - Audio Input Control Service"""
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from bumble import gatt
|
||||
from bumble.device import Connection
|
||||
from bumble.att import ATT_Error
|
||||
from bumble.gatt import (
|
||||
Characteristic,
|
||||
DelegatedCharacteristicAdapter,
|
||||
TemplateService,
|
||||
CharacteristicValue,
|
||||
PackedCharacteristicAdapter,
|
||||
GATT_AUDIO_INPUT_CONTROL_SERVICE,
|
||||
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
|
||||
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
|
||||
GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
|
||||
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC,
|
||||
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
|
||||
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
|
||||
)
|
||||
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
|
||||
from bumble.utils import OpenIntEnum
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
CHANGE_COUNTER_MAX_VALUE = 0xFF
|
||||
GAIN_SETTINGS_MIN_VALUE = 0
|
||||
GAIN_SETTINGS_MAX_VALUE = 255
|
||||
|
||||
|
||||
class ErrorCode(OpenIntEnum):
|
||||
'''
|
||||
Cf. 1.6 Application error codes
|
||||
'''
|
||||
|
||||
INVALID_CHANGE_COUNTER = 0x80
|
||||
OPCODE_NOT_SUPPORTED = 0x81
|
||||
MUTE_DISABLED = 0x82
|
||||
VALUE_OUT_OF_RANGE = 0x83
|
||||
GAIN_MODE_CHANGE_NOT_ALLOWED = 0x84
|
||||
|
||||
|
||||
class Mute(OpenIntEnum):
|
||||
'''
|
||||
Cf. 2.2.1.2 Mute Field
|
||||
'''
|
||||
|
||||
NOT_MUTED = 0x00
|
||||
MUTED = 0x01
|
||||
DISABLED = 0x02
|
||||
|
||||
|
||||
class GainMode(OpenIntEnum):
|
||||
'''
|
||||
Cf. 2.2.1.3 Gain Mode
|
||||
'''
|
||||
|
||||
MANUAL_ONLY = 0x00
|
||||
AUTOMATIC_ONLY = 0x01
|
||||
MANUAL = 0x02
|
||||
AUTOMATIC = 0x03
|
||||
|
||||
|
||||
class AudioInputStatus(OpenIntEnum):
|
||||
'''
|
||||
Cf. 3.4 Audio Input Status
|
||||
'''
|
||||
|
||||
INATIVE = 0x00
|
||||
ACTIVE = 0x01
|
||||
|
||||
|
||||
class AudioInputControlPointOpCode(OpenIntEnum):
|
||||
'''
|
||||
Cf. 3.5.1 Audio Input Control Point procedure requirements
|
||||
'''
|
||||
|
||||
SET_GAIN_SETTING = 0x00
|
||||
UNMUTE = 0x02
|
||||
MUTE = 0x03
|
||||
SET_MANUAL_GAIN_MODE = 0x04
|
||||
SET_AUTOMATIC_GAIN_MODE = 0x05
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class AudioInputState:
|
||||
'''
|
||||
Cf. 2.2.1 Audio Input State
|
||||
'''
|
||||
|
||||
gain_settings: int = 0
|
||||
mute: Mute = Mute.NOT_MUTED
|
||||
gain_mode: GainMode = GainMode.MANUAL
|
||||
change_counter: int = 0
|
||||
attribute_value: Optional[CharacteristicValue] = None
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes(
|
||||
[self.gain_settings, self.mute, self.gain_mode, self.change_counter]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
gain_settings, mute, gain_mode, change_counter = struct.unpack("BBBB", data)
|
||||
return cls(gain_settings, mute, gain_mode, change_counter)
|
||||
|
||||
def update_gain_settings_unit(self, gain_settings_unit: int) -> None:
|
||||
self.gain_settings_unit = gain_settings_unit
|
||||
|
||||
def increment_gain_settings(self, gain_settings_unit: int) -> None:
|
||||
self.gain_settings += gain_settings_unit
|
||||
self.increment_change_counter()
|
||||
|
||||
def decrement_gain_settings(self) -> None:
|
||||
self.gain_settings -= self.gain_settings_unit
|
||||
self.increment_change_counter()
|
||||
|
||||
def increment_change_counter(self):
|
||||
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
|
||||
|
||||
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
|
||||
assert self.attribute_value is not None
|
||||
await connection.device.notify_subscribers(
|
||||
attribute=self.attribute_value, value=bytes(self)
|
||||
)
|
||||
|
||||
def on_read(self, _connection: Optional[Connection]) -> bytes:
|
||||
return bytes(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GainSettingsProperties:
|
||||
'''
|
||||
Cf. 3.2 Gain Settings Properties
|
||||
'''
|
||||
|
||||
gain_settings_unit: int = 1
|
||||
gain_settings_minimum: int = GAIN_SETTINGS_MIN_VALUE
|
||||
gain_settings_maximum: int = GAIN_SETTINGS_MAX_VALUE
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
(gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = (
|
||||
struct.unpack('BBB', data)
|
||||
)
|
||||
GainSettingsProperties(
|
||||
gain_settings_unit, gain_settings_minimum, gain_settings_maximum
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes(
|
||||
[
|
||||
self.gain_settings_unit,
|
||||
self.gain_settings_minimum,
|
||||
self.gain_settings_maximum,
|
||||
]
|
||||
)
|
||||
|
||||
def on_read(self, _connection: Optional[Connection]) -> bytes:
|
||||
return bytes(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AudioInputControlPoint:
|
||||
'''
|
||||
Cf. 3.5.2 Audio Input Control Point
|
||||
'''
|
||||
|
||||
audio_input_state: AudioInputState
|
||||
gain_settings_properties: GainSettingsProperties
|
||||
|
||||
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
|
||||
assert connection
|
||||
|
||||
opcode = AudioInputControlPointOpCode(value[0])
|
||||
|
||||
if opcode == AudioInputControlPointOpCode.SET_GAIN_SETTING:
|
||||
gain_settings_operand = value[2]
|
||||
await self._set_gain_settings(connection, gain_settings_operand)
|
||||
elif opcode == AudioInputControlPointOpCode.UNMUTE:
|
||||
await self._unmute(connection)
|
||||
elif opcode == AudioInputControlPointOpCode.MUTE:
|
||||
change_counter_operand = value[1]
|
||||
await self._mute(connection, change_counter_operand)
|
||||
elif opcode == AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE:
|
||||
await self._set_manual_gain_mode(connection)
|
||||
elif opcode == AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE:
|
||||
await self._set_automatic_gain_mode(connection)
|
||||
else:
|
||||
logger.error(f"OpCode value is incorrect: {opcode}")
|
||||
raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
|
||||
|
||||
async def _set_gain_settings(
|
||||
self, connection: Connection, gain_settings_operand: int
|
||||
) -> None:
|
||||
'''Cf. 3.5.2.1 Set Gain Settings Procedure'''
|
||||
|
||||
gain_mode = self.audio_input_state.gain_mode
|
||||
|
||||
logger.error(f"set_gain_setting: gain_mode: {gain_mode}")
|
||||
if not (gain_mode == GainMode.MANUAL or gain_mode == GainMode.MANUAL_ONLY):
|
||||
logger.warning(
|
||||
"GainMode should be either MANUAL or MANUAL_ONLY Cf Spec Audio Input Control Service 3.5.2.1"
|
||||
)
|
||||
return
|
||||
|
||||
if (
|
||||
gain_settings_operand < self.gain_settings_properties.gain_settings_minimum
|
||||
or gain_settings_operand
|
||||
> self.gain_settings_properties.gain_settings_maximum
|
||||
):
|
||||
logger.error("gain_seetings value out of range")
|
||||
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
|
||||
|
||||
if self.audio_input_state.gain_settings != gain_settings_operand:
|
||||
self.audio_input_state.gain_settings = gain_settings_operand
|
||||
await self.audio_input_state.notify_subscribers_via_connection(connection)
|
||||
|
||||
async def _unmute(self, connection: Connection):
|
||||
'''Cf. 3.5.2.2 Unmute procedure'''
|
||||
|
||||
logger.error(f'unmute: {self.audio_input_state.mute}')
|
||||
mute = self.audio_input_state.mute
|
||||
if mute == Mute.DISABLED:
|
||||
logger.error("unmute: Cannot change Mute value, Mute state is DISABLED")
|
||||
raise ATT_Error(ErrorCode.MUTE_DISABLED)
|
||||
|
||||
if mute == Mute.NOT_MUTED:
|
||||
return
|
||||
|
||||
self.audio_input_state.mute = Mute.NOT_MUTED
|
||||
self.audio_input_state.increment_change_counter()
|
||||
await self.audio_input_state.notify_subscribers_via_connection(connection)
|
||||
|
||||
async def _mute(self, connection: Connection, change_counter_operand: int) -> None:
|
||||
'''Cf. 3.5.5.2 Mute procedure'''
|
||||
|
||||
change_counter = self.audio_input_state.change_counter
|
||||
mute = self.audio_input_state.mute
|
||||
if mute == Mute.DISABLED:
|
||||
logger.error("mute: Cannot change Mute value, Mute state is DISABLED")
|
||||
raise ATT_Error(ErrorCode.MUTE_DISABLED)
|
||||
|
||||
if change_counter != change_counter_operand:
|
||||
raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
|
||||
|
||||
if mute == Mute.MUTED:
|
||||
return
|
||||
|
||||
self.audio_input_state.mute = Mute.MUTED
|
||||
self.audio_input_state.increment_change_counter()
|
||||
await self.audio_input_state.notify_subscribers_via_connection(connection)
|
||||
|
||||
async def _set_manual_gain_mode(self, connection: Connection) -> None:
|
||||
'''Cf. 3.5.2.4 Set Manual Gain Mode procedure'''
|
||||
|
||||
gain_mode = self.audio_input_state.gain_mode
|
||||
if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
|
||||
logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
|
||||
raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
|
||||
|
||||
if gain_mode == GainMode.MANUAL:
|
||||
return
|
||||
|
||||
self.audio_input_state.gain_mode = GainMode.MANUAL
|
||||
self.audio_input_state.increment_change_counter()
|
||||
await self.audio_input_state.notify_subscribers_via_connection(connection)
|
||||
|
||||
async def _set_automatic_gain_mode(self, connection: Connection) -> None:
|
||||
'''Cf. 3.5.2.5 Set Automatic Gain Mode'''
|
||||
|
||||
gain_mode = self.audio_input_state.gain_mode
|
||||
if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
|
||||
logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
|
||||
raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
|
||||
|
||||
if gain_mode == GainMode.AUTOMATIC:
|
||||
return
|
||||
|
||||
self.audio_input_state.gain_mode = GainMode.AUTOMATIC
|
||||
self.audio_input_state.increment_change_counter()
|
||||
await self.audio_input_state.notify_subscribers_via_connection(connection)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AudioInputDescription:
|
||||
'''
|
||||
Cf. 3.6 Audio Input Description
|
||||
'''
|
||||
|
||||
audio_input_description: str = "Bluetooth"
|
||||
attribute_value: Optional[CharacteristicValue] = None
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
return cls(audio_input_description=data.decode('utf-8'))
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.audio_input_description.encode('utf-8')
|
||||
|
||||
def on_read(self, _connection: Optional[Connection]) -> bytes:
|
||||
return self.audio_input_description.encode('utf-8')
|
||||
|
||||
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
|
||||
assert connection
|
||||
assert self.attribute_value
|
||||
|
||||
self.audio_input_description = value.decode('utf-8')
|
||||
await connection.device.notify_subscribers(
|
||||
attribute=self.attribute_value, value=value
|
||||
)
|
||||
|
||||
|
||||
class AICSService(TemplateService):
|
||||
UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
audio_input_state: Optional[AudioInputState] = None,
|
||||
gain_settings_properties: Optional[GainSettingsProperties] = None,
|
||||
audio_input_type: str = "local",
|
||||
audio_input_status: Optional[AudioInputStatus] = None,
|
||||
audio_input_description: Optional[AudioInputDescription] = None,
|
||||
):
|
||||
self.audio_input_state = (
|
||||
AudioInputState() if audio_input_state is None else audio_input_state
|
||||
)
|
||||
self.gain_settings_properties = (
|
||||
GainSettingsProperties()
|
||||
if gain_settings_properties is None
|
||||
else gain_settings_properties
|
||||
)
|
||||
self.audio_input_status = (
|
||||
AudioInputStatus.ACTIVE
|
||||
if audio_input_status is None
|
||||
else audio_input_status
|
||||
)
|
||||
self.audio_input_description = (
|
||||
AudioInputDescription()
|
||||
if audio_input_description is None
|
||||
else audio_input_description
|
||||
)
|
||||
|
||||
self.audio_input_control_point: AudioInputControlPoint = AudioInputControlPoint(
|
||||
self.audio_input_state, self.gain_settings_properties
|
||||
)
|
||||
|
||||
self.audio_input_state_characteristic = DelegatedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
uuid=GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
|
||||
properties=Characteristic.Properties.READ
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
|
||||
value=CharacteristicValue(read=self.audio_input_state.on_read),
|
||||
),
|
||||
encode=lambda value: bytes(value),
|
||||
)
|
||||
self.audio_input_state.attribute_value = (
|
||||
self.audio_input_state_characteristic.value
|
||||
)
|
||||
|
||||
self.gain_settings_properties_characteristic = DelegatedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
|
||||
properties=Characteristic.Properties.READ,
|
||||
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
|
||||
value=CharacteristicValue(read=self.gain_settings_properties.on_read),
|
||||
)
|
||||
)
|
||||
|
||||
self.audio_input_type_characteristic = Characteristic(
|
||||
uuid=GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
|
||||
properties=Characteristic.Properties.READ,
|
||||
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
|
||||
value=audio_input_type,
|
||||
)
|
||||
|
||||
self.audio_input_status_characteristic = Characteristic(
|
||||
uuid=GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC,
|
||||
properties=Characteristic.Properties.READ,
|
||||
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
|
||||
value=bytes([self.audio_input_status]),
|
||||
)
|
||||
|
||||
self.audio_input_control_point_characteristic = DelegatedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
|
||||
properties=Characteristic.Properties.WRITE,
|
||||
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
|
||||
value=CharacteristicValue(
|
||||
write=self.audio_input_control_point.on_write
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
self.audio_input_description_characteristic = DelegatedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
uuid=GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
|
||||
properties=Characteristic.Properties.READ
|
||||
| Characteristic.Properties.NOTIFY
|
||||
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
|
||||
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
|
||||
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
|
||||
value=CharacteristicValue(
|
||||
write=self.audio_input_description.on_write,
|
||||
read=self.audio_input_description.on_read,
|
||||
),
|
||||
)
|
||||
)
|
||||
self.audio_input_description.attribute_value = (
|
||||
self.audio_input_control_point_characteristic.value
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
[
|
||||
self.audio_input_state_characteristic, # type: ignore
|
||||
self.gain_settings_properties_characteristic, # type: ignore
|
||||
self.audio_input_type_characteristic, # type: ignore
|
||||
self.audio_input_status_characteristic, # type: ignore
|
||||
self.audio_input_control_point_characteristic, # type: ignore
|
||||
self.audio_input_description_characteristic, # type: ignore
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Client
|
||||
# -----------------------------------------------------------------------------
|
||||
class AICSServiceProxy(ProfileServiceProxy):
|
||||
SERVICE_CLASS = AICSService
|
||||
|
||||
def __init__(self, service_proxy: ServiceProxy) -> None:
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
if not (
|
||||
characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
|
||||
)
|
||||
):
|
||||
raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
|
||||
self.audio_input_state = DelegatedCharacteristicAdapter(
|
||||
characteristic=characteristics[0], decode=AudioInputState.from_bytes
|
||||
)
|
||||
|
||||
if not (
|
||||
characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
|
||||
)
|
||||
):
|
||||
raise gatt.InvalidServiceError(
|
||||
"Gain Settings Attribute Characteristic not found"
|
||||
)
|
||||
self.gain_settings_properties = PackedCharacteristicAdapter(
|
||||
characteristics[0],
|
||||
'BBB',
|
||||
)
|
||||
|
||||
if not (
|
||||
characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
|
||||
)
|
||||
):
|
||||
raise gatt.InvalidServiceError(
|
||||
"Audio Input Status Characteristic not found"
|
||||
)
|
||||
self.audio_input_status = PackedCharacteristicAdapter(
|
||||
characteristics[0],
|
||||
'B',
|
||||
)
|
||||
|
||||
if not (
|
||||
characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC
|
||||
)
|
||||
):
|
||||
raise gatt.InvalidServiceError(
|
||||
"Audio Input Control Point Characteristic not found"
|
||||
)
|
||||
self.audio_input_control_point = characteristics[0]
|
||||
|
||||
if not (
|
||||
characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
|
||||
)
|
||||
):
|
||||
raise gatt.InvalidServiceError(
|
||||
"Audio Input Description Characteristic not found"
|
||||
)
|
||||
self.audio_input_description = characteristics[0]
|
||||
@@ -0,0 +1,665 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import functools
|
||||
from bumble import att, gatt, gatt_client
|
||||
from bumble.core import InvalidArgumentError, InvalidStateError
|
||||
from bumble.device import Device, Connection
|
||||
from bumble.utils import AsyncRunner, OpenIntEnum
|
||||
from bumble.hci import Address
|
||||
from dataclasses import dataclass, field
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Set, Union
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
class ErrorCode(OpenIntEnum):
|
||||
'''See Hearing Access Service 2.4. Attribute Profile error codes.'''
|
||||
|
||||
INVALID_OPCODE = 0x80
|
||||
WRITE_NAME_NOT_ALLOWED = 0x81
|
||||
PRESET_SYNCHRONIZATION_NOT_SUPPORTED = 0x82
|
||||
PRESET_OPERATION_NOT_POSSIBLE = 0x83
|
||||
INVALID_PARAMETERS_LENGTH = 0x84
|
||||
|
||||
|
||||
class HearingAidType(OpenIntEnum):
|
||||
'''See Hearing Access Service 3.1. Hearing Aid Features.'''
|
||||
|
||||
BINAURAL_HEARING_AID = 0b00
|
||||
MONAURAL_HEARING_AID = 0b01
|
||||
BANDED_HEARING_AID = 0b10
|
||||
|
||||
|
||||
class PresetSynchronizationSupport(OpenIntEnum):
|
||||
'''See Hearing Access Service 3.1. Hearing Aid Features.'''
|
||||
|
||||
PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED = 0b0
|
||||
PRESET_SYNCHRONIZATION_IS_SUPPORTED = 0b1
|
||||
|
||||
|
||||
class IndependentPresets(OpenIntEnum):
|
||||
'''See Hearing Access Service 3.1. Hearing Aid Features.'''
|
||||
|
||||
IDENTICAL_PRESET_RECORD = 0b0
|
||||
DIFFERENT_PRESET_RECORD = 0b1
|
||||
|
||||
|
||||
class DynamicPresets(OpenIntEnum):
|
||||
'''See Hearing Access Service 3.1. Hearing Aid Features.'''
|
||||
|
||||
PRESET_RECORDS_DOES_NOT_CHANGE = 0b0
|
||||
PRESET_RECORDS_MAY_CHANGE = 0b1
|
||||
|
||||
|
||||
class WritablePresetsSupport(OpenIntEnum):
|
||||
'''See Hearing Access Service 3.1. Hearing Aid Features.'''
|
||||
|
||||
WRITABLE_PRESET_RECORDS_NOT_SUPPORTED = 0b0
|
||||
WRITABLE_PRESET_RECORDS_SUPPORTED = 0b1
|
||||
|
||||
|
||||
class HearingAidPresetControlPointOpcode(OpenIntEnum):
|
||||
'''See Hearing Access Service 3.3.1 Hearing Aid Preset Control Point operation requirements.'''
|
||||
|
||||
# fmt: off
|
||||
READ_PRESETS_REQUEST = 0x01
|
||||
READ_PRESET_RESPONSE = 0x02
|
||||
PRESET_CHANGED = 0x03
|
||||
WRITE_PRESET_NAME = 0x04
|
||||
SET_ACTIVE_PRESET = 0x05
|
||||
SET_NEXT_PRESET = 0x06
|
||||
SET_PREVIOUS_PRESET = 0x07
|
||||
SET_ACTIVE_PRESET_SYNCHRONIZED_LOCALLY = 0x08
|
||||
SET_NEXT_PRESET_SYNCHRONIZED_LOCALLY = 0x09
|
||||
SET_PREVIOUS_PRESET_SYNCHRONIZED_LOCALLY = 0x0A
|
||||
|
||||
|
||||
@dataclass
|
||||
class HearingAidFeatures:
|
||||
'''See Hearing Access Service 3.1. Hearing Aid Features.'''
|
||||
|
||||
hearing_aid_type: HearingAidType
|
||||
preset_synchronization_support: PresetSynchronizationSupport
|
||||
independent_presets: IndependentPresets
|
||||
dynamic_presets: DynamicPresets
|
||||
writable_presets_support: WritablePresetsSupport
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes(
|
||||
[
|
||||
(self.hearing_aid_type << 0)
|
||||
| (self.preset_synchronization_support << 2)
|
||||
| (self.independent_presets << 3)
|
||||
| (self.dynamic_presets << 4)
|
||||
| (self.writable_presets_support << 5)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def HearingAidFeatures_from_bytes(data: int) -> HearingAidFeatures:
|
||||
return HearingAidFeatures(
|
||||
HearingAidType(data & 0b11),
|
||||
PresetSynchronizationSupport(data >> 2 & 0b1),
|
||||
IndependentPresets(data >> 3 & 0b1),
|
||||
DynamicPresets(data >> 4 & 0b1),
|
||||
WritablePresetsSupport(data >> 5 & 0b1),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PresetChangedOperation:
|
||||
'''See Hearing Access Service 3.2.2.2. Preset Changed operation.'''
|
||||
|
||||
class ChangeId(OpenIntEnum):
|
||||
# fmt: off
|
||||
GENERIC_UPDATE = 0x00
|
||||
PRESET_RECORD_DELETED = 0x01
|
||||
PRESET_RECORD_AVAILABLE = 0x02
|
||||
PRESET_RECORD_UNAVAILABLE = 0x03
|
||||
|
||||
@dataclass
|
||||
class Generic:
|
||||
prev_index: int
|
||||
preset_record: PresetRecord
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes([self.prev_index]) + bytes(self.preset_record)
|
||||
|
||||
change_id: ChangeId
|
||||
additional_parameters: Union[Generic, int]
|
||||
|
||||
def to_bytes(self, is_last: bool) -> bytes:
|
||||
if isinstance(self.additional_parameters, PresetChangedOperation.Generic):
|
||||
additional_parameters_bytes = bytes(self.additional_parameters)
|
||||
else:
|
||||
additional_parameters_bytes = bytes([self.additional_parameters])
|
||||
|
||||
return (
|
||||
bytes(
|
||||
[
|
||||
HearingAidPresetControlPointOpcode.PRESET_CHANGED,
|
||||
self.change_id,
|
||||
is_last,
|
||||
]
|
||||
)
|
||||
+ additional_parameters_bytes
|
||||
)
|
||||
|
||||
|
||||
class PresetChangedOperationDeleted(PresetChangedOperation):
|
||||
def __init__(self, index) -> None:
|
||||
self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_DELETED
|
||||
self.additional_parameters = index
|
||||
|
||||
|
||||
class PresetChangedOperationAvailable(PresetChangedOperation):
|
||||
def __init__(self, index) -> None:
|
||||
self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_AVAILABLE
|
||||
self.additional_parameters = index
|
||||
|
||||
|
||||
class PresetChangedOperationUnavailable(PresetChangedOperation):
|
||||
def __init__(self, index) -> None:
|
||||
self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_UNAVAILABLE
|
||||
self.additional_parameters = index
|
||||
|
||||
|
||||
@dataclass
|
||||
class PresetRecord:
|
||||
'''See Hearing Access Service 2.8. Preset record.'''
|
||||
|
||||
@dataclass
|
||||
class Property:
|
||||
class Writable(OpenIntEnum):
|
||||
CANNOT_BE_WRITTEN = 0b0
|
||||
CAN_BE_WRITTEN = 0b1
|
||||
|
||||
class IsAvailable(OpenIntEnum):
|
||||
IS_UNAVAILABLE = 0b0
|
||||
IS_AVAILABLE = 0b1
|
||||
|
||||
writable: Writable = Writable.CAN_BE_WRITTEN
|
||||
is_available: IsAvailable = IsAvailable.IS_AVAILABLE
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes([self.writable | (self.is_available << 1)])
|
||||
|
||||
index: int
|
||||
name: str
|
||||
properties: Property = field(default_factory=Property)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes([self.index]) + bytes(self.properties) + self.name.encode('utf-8')
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return (
|
||||
self.properties.is_available
|
||||
== PresetRecord.Property.IsAvailable.IS_AVAILABLE
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Server
|
||||
# -----------------------------------------------------------------------------
|
||||
class HearingAccessService(gatt.TemplateService):
|
||||
UUID = gatt.GATT_HEARING_ACCESS_SERVICE
|
||||
|
||||
hearing_aid_features_characteristic: gatt.Characteristic
|
||||
hearing_aid_preset_control_point: gatt.Characteristic
|
||||
active_preset_index_characteristic: gatt.Characteristic
|
||||
active_preset_index: int
|
||||
active_preset_index_per_device: Dict[Address, int]
|
||||
|
||||
device: Device
|
||||
|
||||
server_features: HearingAidFeatures
|
||||
preset_records: Dict[int, PresetRecord] # key is the preset index
|
||||
read_presets_request_in_progress: bool
|
||||
|
||||
preset_changed_operations_history_per_device: Dict[
|
||||
Address, List[PresetChangedOperation]
|
||||
]
|
||||
|
||||
# Keep an updated list of connected client to send notification to
|
||||
currently_connected_clients: Set[Connection]
|
||||
|
||||
def __init__(
|
||||
self, device: Device, features: HearingAidFeatures, presets: List[PresetRecord]
|
||||
) -> None:
|
||||
self.active_preset_index_per_device = {}
|
||||
self.read_presets_request_in_progress = False
|
||||
self.preset_changed_operations_history_per_device = {}
|
||||
self.currently_connected_clients = set()
|
||||
|
||||
self.device = device
|
||||
self.server_features = features
|
||||
if len(presets) < 1:
|
||||
raise InvalidArgumentError(f'Invalid presets: {presets}')
|
||||
|
||||
self.preset_records = {}
|
||||
for p in presets:
|
||||
if len(p.name.encode()) < 1 or len(p.name.encode()) > 40:
|
||||
raise InvalidArgumentError(f'Invalid name: {p.name}')
|
||||
|
||||
self.preset_records[p.index] = p
|
||||
|
||||
# associate the lowest index as the current active preset at startup
|
||||
self.active_preset_index = sorted(self.preset_records.keys())[0]
|
||||
|
||||
@device.on('connection') # type: ignore
|
||||
def on_connection(connection: Connection) -> None:
|
||||
@connection.on('disconnection') # type: ignore
|
||||
def on_disconnection(_reason) -> None:
|
||||
self.currently_connected_clients.remove(connection)
|
||||
|
||||
# TODO Should we filter on device bonded && device is HAP ?
|
||||
self.currently_connected_clients.add(connection)
|
||||
if (
|
||||
connection.peer_address
|
||||
not in self.preset_changed_operations_history_per_device
|
||||
):
|
||||
self.preset_changed_operations_history_per_device[
|
||||
connection.peer_address
|
||||
] = []
|
||||
return
|
||||
|
||||
async def on_connection_async() -> None:
|
||||
# Send all the PresetChangedOperation that occur when not connected
|
||||
await self._preset_changed_operation(connection)
|
||||
# Update the active preset index if needed
|
||||
await self.notify_active_preset_for_connection(connection)
|
||||
|
||||
connection.abort_on('disconnection', on_connection_async())
|
||||
|
||||
self.hearing_aid_features_characteristic = gatt.Characteristic(
|
||||
uuid=gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC,
|
||||
properties=gatt.Characteristic.Properties.READ,
|
||||
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
|
||||
value=bytes(self.server_features),
|
||||
)
|
||||
self.hearing_aid_preset_control_point = gatt.Characteristic(
|
||||
uuid=gatt.GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC,
|
||||
properties=(
|
||||
gatt.Characteristic.Properties.WRITE
|
||||
| gatt.Characteristic.Properties.INDICATE
|
||||
),
|
||||
permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
|
||||
value=gatt.CharacteristicValue(
|
||||
write=self._on_write_hearing_aid_preset_control_point
|
||||
),
|
||||
)
|
||||
self.active_preset_index_characteristic = gatt.Characteristic(
|
||||
uuid=gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC,
|
||||
properties=(
|
||||
gatt.Characteristic.Properties.READ
|
||||
| gatt.Characteristic.Properties.NOTIFY
|
||||
),
|
||||
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
|
||||
value=gatt.CharacteristicValue(read=self._on_read_active_preset_index),
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
[
|
||||
self.hearing_aid_features_characteristic,
|
||||
self.hearing_aid_preset_control_point,
|
||||
self.active_preset_index_characteristic,
|
||||
]
|
||||
)
|
||||
|
||||
def _on_read_active_preset_index(
|
||||
self, __connection__: Optional[Connection]
|
||||
) -> bytes:
|
||||
return bytes([self.active_preset_index])
|
||||
|
||||
# TODO this need to be triggered when device is unbonded
|
||||
def on_forget(self, addr: Address) -> None:
|
||||
self.preset_changed_operations_history_per_device.pop(addr)
|
||||
|
||||
async def _on_write_hearing_aid_preset_control_point(
|
||||
self, connection: Optional[Connection], value: bytes
|
||||
):
|
||||
assert connection
|
||||
|
||||
opcode = HearingAidPresetControlPointOpcode(value[0])
|
||||
handler = getattr(self, '_on_' + opcode.name.lower())
|
||||
await handler(connection, value)
|
||||
|
||||
async def _on_read_presets_request(
|
||||
self, connection: Optional[Connection], value: bytes
|
||||
):
|
||||
assert connection
|
||||
if connection.att_mtu < 49: # 2.5. GATT sub-procedure requirements
|
||||
logging.warning(f'HAS require MTU >= 49: {connection}')
|
||||
|
||||
if self.read_presets_request_in_progress:
|
||||
raise att.ATT_Error(att.ErrorCode.PROCEDURE_ALREADY_IN_PROGRESS)
|
||||
self.read_presets_request_in_progress = True
|
||||
|
||||
start_index = value[1]
|
||||
if start_index == 0x00:
|
||||
raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE)
|
||||
|
||||
num_presets = value[2]
|
||||
if num_presets == 0x00:
|
||||
raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE)
|
||||
|
||||
# Sending `num_presets` presets ordered by increasing index field, starting from start_index
|
||||
presets = [
|
||||
self.preset_records[key]
|
||||
for key in sorted(self.preset_records.keys())
|
||||
if self.preset_records[key].index >= start_index
|
||||
]
|
||||
del presets[num_presets:]
|
||||
if len(presets) == 0:
|
||||
raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE)
|
||||
|
||||
AsyncRunner.spawn(self._read_preset_response(connection, presets))
|
||||
|
||||
async def _read_preset_response(
|
||||
self, connection: Connection, presets: List[PresetRecord]
|
||||
):
|
||||
# If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Read Presets Request operation aborted and shall not either continue or restart the operation when the client reconnects.
|
||||
try:
|
||||
for i, preset in enumerate(presets):
|
||||
await connection.device.indicate_subscriber(
|
||||
connection,
|
||||
self.hearing_aid_preset_control_point,
|
||||
value=bytes(
|
||||
[
|
||||
HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE,
|
||||
i == len(presets) - 1,
|
||||
]
|
||||
)
|
||||
+ bytes(preset),
|
||||
)
|
||||
|
||||
finally:
|
||||
# indicate_subscriber can raise a TimeoutError, we need to gracefully terminate the operation
|
||||
self.read_presets_request_in_progress = False
|
||||
|
||||
async def generic_update(self, op: PresetChangedOperation) -> None:
|
||||
'''Server API to perform a generic update. It is the responsibility of the caller to modify the preset_records to match the PresetChangedOperation being sent'''
|
||||
await self._notifyPresetOperations(op)
|
||||
|
||||
async def delete_preset(self, index: int) -> None:
|
||||
'''Server API to delete a preset. It should not be the current active preset'''
|
||||
|
||||
if index == self.active_preset_index:
|
||||
raise InvalidStateError('Cannot delete active preset')
|
||||
|
||||
del self.preset_records[index]
|
||||
await self._notifyPresetOperations(PresetChangedOperationDeleted(index))
|
||||
|
||||
async def available_preset(self, index: int) -> None:
|
||||
'''Server API to make a preset available'''
|
||||
|
||||
preset = self.preset_records[index]
|
||||
preset.properties.is_available = PresetRecord.Property.IsAvailable.IS_AVAILABLE
|
||||
await self._notifyPresetOperations(PresetChangedOperationAvailable(index))
|
||||
|
||||
async def unavailable_preset(self, index: int) -> None:
|
||||
'''Server API to make a preset unavailable. It should not be the current active preset'''
|
||||
|
||||
if index == self.active_preset_index:
|
||||
raise InvalidStateError('Cannot set active preset as unavailable')
|
||||
|
||||
preset = self.preset_records[index]
|
||||
preset.properties.is_available = (
|
||||
PresetRecord.Property.IsAvailable.IS_UNAVAILABLE
|
||||
)
|
||||
await self._notifyPresetOperations(PresetChangedOperationUnavailable(index))
|
||||
|
||||
async def _preset_changed_operation(self, connection: Connection) -> None:
|
||||
'''Send all PresetChangedOperation saved for a given connection'''
|
||||
op_list = self.preset_changed_operations_history_per_device.get(
|
||||
connection.peer_address, []
|
||||
)
|
||||
|
||||
# Notification will be sent in index order
|
||||
def get_op_index(op: PresetChangedOperation) -> int:
|
||||
if isinstance(op.additional_parameters, PresetChangedOperation.Generic):
|
||||
return op.additional_parameters.prev_index
|
||||
return op.additional_parameters
|
||||
|
||||
op_list.sort(key=get_op_index)
|
||||
# If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Preset Changed operation aborted and shall continue the operation when the client reconnects.
|
||||
while len(op_list) > 0:
|
||||
try:
|
||||
await connection.device.indicate_subscriber(
|
||||
connection,
|
||||
self.hearing_aid_preset_control_point,
|
||||
value=op_list[0].to_bytes(len(op_list) == 1),
|
||||
)
|
||||
# Remove item once sent, and keep the non sent item in the list
|
||||
op_list.pop(0)
|
||||
except TimeoutError:
|
||||
break
|
||||
|
||||
async def _notifyPresetOperations(self, op: PresetChangedOperation) -> None:
|
||||
for historyList in self.preset_changed_operations_history_per_device.values():
|
||||
historyList.append(op)
|
||||
|
||||
for connection in self.currently_connected_clients:
|
||||
await self._preset_changed_operation(connection)
|
||||
|
||||
async def _on_write_preset_name(
|
||||
self, connection: Optional[Connection], value: bytes
|
||||
):
|
||||
assert connection
|
||||
|
||||
if self.read_presets_request_in_progress:
|
||||
raise att.ATT_Error(att.ErrorCode.PROCEDURE_ALREADY_IN_PROGRESS)
|
||||
|
||||
index = value[1]
|
||||
preset = self.preset_records.get(index, None)
|
||||
if (
|
||||
not preset
|
||||
or preset.properties.writable
|
||||
== PresetRecord.Property.Writable.CANNOT_BE_WRITTEN
|
||||
):
|
||||
raise att.ATT_Error(ErrorCode.WRITE_NAME_NOT_ALLOWED)
|
||||
|
||||
name = value[2:].decode('utf-8')
|
||||
if not name or len(name) > 40:
|
||||
raise att.ATT_Error(ErrorCode.INVALID_PARAMETERS_LENGTH)
|
||||
|
||||
preset.name = name
|
||||
|
||||
await self.generic_update(
|
||||
PresetChangedOperation(
|
||||
PresetChangedOperation.ChangeId.GENERIC_UPDATE,
|
||||
PresetChangedOperation.Generic(index, preset),
|
||||
)
|
||||
)
|
||||
|
||||
async def notify_active_preset_for_connection(self, connection: Connection) -> None:
|
||||
if (
|
||||
self.active_preset_index_per_device.get(connection.peer_address, 0x00)
|
||||
== self.active_preset_index
|
||||
):
|
||||
# Nothing to do, peer is already updated
|
||||
return
|
||||
|
||||
await connection.device.notify_subscriber(
|
||||
connection,
|
||||
attribute=self.active_preset_index_characteristic,
|
||||
value=bytes([self.active_preset_index]),
|
||||
)
|
||||
self.active_preset_index_per_device[connection.peer_address] = (
|
||||
self.active_preset_index
|
||||
)
|
||||
|
||||
async def notify_active_preset(self) -> None:
|
||||
for connection in self.currently_connected_clients:
|
||||
await self.notify_active_preset_for_connection(connection)
|
||||
|
||||
async def set_active_preset(
|
||||
self, connection: Optional[Connection], value: bytes
|
||||
) -> None:
|
||||
assert connection
|
||||
index = value[1]
|
||||
preset = self.preset_records.get(index, None)
|
||||
if (
|
||||
not preset
|
||||
or preset.properties.is_available
|
||||
!= PresetRecord.Property.IsAvailable.IS_AVAILABLE
|
||||
):
|
||||
raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE)
|
||||
|
||||
if index == self.active_preset_index:
|
||||
# Already at correct value
|
||||
return
|
||||
|
||||
self.active_preset_index = index
|
||||
await self.notify_active_preset()
|
||||
|
||||
async def _on_set_active_preset(
|
||||
self, connection: Optional[Connection], value: bytes
|
||||
):
|
||||
await self.set_active_preset(connection, value)
|
||||
|
||||
async def set_next_or_previous_preset(
|
||||
self, connection: Optional[Connection], is_previous
|
||||
):
|
||||
'''Set the next or the previous preset as active'''
|
||||
assert connection
|
||||
|
||||
if self.active_preset_index == 0x00:
|
||||
raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE)
|
||||
|
||||
first_preset: Optional[PresetRecord] = None # To loop to first preset
|
||||
next_preset: Optional[PresetRecord] = None
|
||||
for index, record in sorted(self.preset_records.items(), reverse=is_previous):
|
||||
if not record.is_available():
|
||||
continue
|
||||
if first_preset == None:
|
||||
first_preset = record
|
||||
if is_previous:
|
||||
if index >= self.active_preset_index:
|
||||
continue
|
||||
elif index <= self.active_preset_index:
|
||||
continue
|
||||
next_preset = record
|
||||
break
|
||||
|
||||
if not first_preset: # If no other preset are available
|
||||
raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE)
|
||||
|
||||
if next_preset:
|
||||
self.active_preset_index = next_preset.index
|
||||
else:
|
||||
self.active_preset_index = first_preset.index
|
||||
await self.notify_active_preset()
|
||||
|
||||
async def _on_set_next_preset(
|
||||
self, connection: Optional[Connection], __value__: bytes
|
||||
) -> None:
|
||||
await self.set_next_or_previous_preset(connection, False)
|
||||
|
||||
async def _on_set_previous_preset(
|
||||
self, connection: Optional[Connection], __value__: bytes
|
||||
) -> None:
|
||||
await self.set_next_or_previous_preset(connection, True)
|
||||
|
||||
async def _on_set_active_preset_synchronized_locally(
|
||||
self, connection: Optional[Connection], value: bytes
|
||||
):
|
||||
if (
|
||||
self.server_features.preset_synchronization_support
|
||||
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
|
||||
):
|
||||
raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
|
||||
await self.set_active_preset(connection, value)
|
||||
# TODO (low priority) inform other server of the change
|
||||
|
||||
async def _on_set_next_preset_synchronized_locally(
|
||||
self, connection: Optional[Connection], __value__: bytes
|
||||
):
|
||||
if (
|
||||
self.server_features.preset_synchronization_support
|
||||
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
|
||||
):
|
||||
raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
|
||||
await self.set_next_or_previous_preset(connection, False)
|
||||
# TODO (low priority) inform other server of the change
|
||||
|
||||
async def _on_set_previous_preset_synchronized_locally(
|
||||
self, connection: Optional[Connection], __value__: bytes
|
||||
):
|
||||
if (
|
||||
self.server_features.preset_synchronization_support
|
||||
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
|
||||
):
|
||||
raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
|
||||
await self.set_next_or_previous_preset(connection, True)
|
||||
# TODO (low priority) inform other server of the change
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Client
|
||||
# -----------------------------------------------------------------------------
|
||||
class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy):
|
||||
SERVICE_CLASS = HearingAccessService
|
||||
|
||||
hearing_aid_preset_control_point: gatt_client.CharacteristicProxy
|
||||
preset_control_point_indications: asyncio.Queue
|
||||
|
||||
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
self.server_features = gatt.PackedCharacteristicAdapter(
|
||||
service_proxy.get_characteristics_by_uuid(
|
||||
gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC
|
||||
)[0],
|
||||
'B',
|
||||
)
|
||||
|
||||
self.hearing_aid_preset_control_point = (
|
||||
service_proxy.get_characteristics_by_uuid(
|
||||
gatt.GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC
|
||||
)[0]
|
||||
)
|
||||
|
||||
self.active_preset_index = gatt.PackedCharacteristicAdapter(
|
||||
service_proxy.get_characteristics_by_uuid(
|
||||
gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC
|
||||
)[0],
|
||||
'B',
|
||||
)
|
||||
|
||||
async def setup_subscription(self):
|
||||
self.preset_control_point_indications = asyncio.Queue()
|
||||
self.active_preset_index_notification = asyncio.Queue()
|
||||
|
||||
def on_active_preset_index_notification(data: bytes):
|
||||
self.active_preset_index_notification.put_nowait(data)
|
||||
|
||||
def on_preset_control_point_indication(data: bytes):
|
||||
self.preset_control_point_indications.put_nowait(data)
|
||||
|
||||
await self.hearing_aid_preset_control_point.subscribe(
|
||||
functools.partial(on_preset_control_point_indication), prefer_notify=False
|
||||
)
|
||||
|
||||
await self.active_preset_index.subscribe(
|
||||
functools.partial(on_active_preset_index_notification)
|
||||
)
|
||||
@@ -0,0 +1,107 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
|
||||
from bumble.core import AdvertisingData
|
||||
from bumble.device import Device
|
||||
from bumble import att
|
||||
from bumble.profiles.hap import (
|
||||
HearingAccessService,
|
||||
HearingAidFeatures,
|
||||
HearingAidType,
|
||||
PresetSynchronizationSupport,
|
||||
IndependentPresets,
|
||||
DynamicPresets,
|
||||
WritablePresetsSupport,
|
||||
PresetRecord,
|
||||
)
|
||||
|
||||
from bumble.transport import open_transport_or_link
|
||||
|
||||
server_features = HearingAidFeatures(
|
||||
HearingAidType.MONAURAL_HEARING_AID,
|
||||
PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED,
|
||||
IndependentPresets.IDENTICAL_PRESET_RECORD,
|
||||
DynamicPresets.PRESET_RECORDS_DOES_NOT_CHANGE,
|
||||
WritablePresetsSupport.WRITABLE_PRESET_RECORDS_SUPPORTED,
|
||||
)
|
||||
|
||||
foo_preset = PresetRecord(1, "foo preset")
|
||||
bar_preset = PresetRecord(50, "bar preset")
|
||||
foobar_preset = PresetRecord(5, "foobar preset")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_hap_server.py <config-file> <transport-spec-for-device>')
|
||||
print('example: run_hap_server.py device1.json pty:hci_pty')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
await device.power_on()
|
||||
|
||||
hap = HearingAccessService(
|
||||
device, server_features, [foo_preset, bar_preset, foobar_preset]
|
||||
)
|
||||
device.add_service(hap)
|
||||
|
||||
advertising_data = bytes(
|
||||
AdvertisingData(
|
||||
[
|
||||
(
|
||||
AdvertisingData.COMPLETE_LOCAL_NAME,
|
||||
bytes('Bumble HearingAccessService', 'utf-8'),
|
||||
),
|
||||
(
|
||||
AdvertisingData.FLAGS,
|
||||
bytes(
|
||||
[
|
||||
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
|
||||
| AdvertisingData.BR_EDR_HOST_FLAG
|
||||
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
|
||||
]
|
||||
),
|
||||
),
|
||||
(
|
||||
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
|
||||
bytes(HearingAccessService.UUID),
|
||||
),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
await device.create_advertising_set(
|
||||
advertising_data=advertising_data,
|
||||
auto_restart=True,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,484 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from bumble import device
|
||||
|
||||
from bumble.att import ATT_Error
|
||||
|
||||
from bumble.profiles.aics import (
|
||||
Mute,
|
||||
AICSService,
|
||||
AudioInputState,
|
||||
AICSServiceProxy,
|
||||
GainMode,
|
||||
AudioInputStatus,
|
||||
AudioInputControlPointOpCode,
|
||||
GAIN_SETTINGS_MAX_VALUE,
|
||||
GAIN_SETTINGS_MIN_VALUE,
|
||||
ErrorCode,
|
||||
)
|
||||
|
||||
from .test_utils import TwoDevices
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Tests
|
||||
# -----------------------------------------------------------------------------
|
||||
aics_service = AICSService()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def aics_client():
|
||||
devices = TwoDevices()
|
||||
devices[0].add_service(aics_service)
|
||||
|
||||
await devices.setup_connection()
|
||||
|
||||
assert devices.connections[0] is not None
|
||||
assert devices.connections[1] is not None
|
||||
|
||||
devices.connections[0].encryption = 1
|
||||
devices.connections[1].encryption = 1
|
||||
|
||||
peer = device.Peer(devices.connections[1])
|
||||
aics_client = await peer.discover_service_and_create_proxy(AICSServiceProxy)
|
||||
|
||||
yield aics_client
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_service(aics_client: AICSServiceProxy):
|
||||
assert await aics_client.audio_input_state.read_value() == AudioInputState(
|
||||
gain_settings=0,
|
||||
mute=Mute.NOT_MUTED,
|
||||
gain_mode=GainMode.MANUAL,
|
||||
change_counter=0,
|
||||
)
|
||||
assert await aics_client.gain_settings_properties.read_value() == (1, 0, 255)
|
||||
assert await aics_client.audio_input_status.read_value() == (
|
||||
AudioInputStatus.ACTIVE
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wrong_opcode_raise_error(aics_client: AICSServiceProxy):
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
0xFF,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.OPCODE_NOT_SUPPORTED
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_gain_setting_when_gain_mode_automatic_only(
|
||||
aics_client: AICSServiceProxy,
|
||||
):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
|
||||
|
||||
change_counter = 0
|
||||
gain_settings = 120
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_GAIN_SETTING,
|
||||
change_counter,
|
||||
gain_settings,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Unchanged
|
||||
assert await aics_client.audio_input_state.read_value() == AudioInputState(
|
||||
gain_settings=0,
|
||||
mute=Mute.NOT_MUTED,
|
||||
gain_mode=GainMode.AUTOMATIC_ONLY,
|
||||
change_counter=0,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_gain_setting_when_gain_mode_automatic(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
|
||||
change_counter = 0
|
||||
gain_settings = 120
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_GAIN_SETTING,
|
||||
change_counter,
|
||||
gain_settings,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Unchanged
|
||||
assert await aics_client.audio_input_state.read_value() == AudioInputState(
|
||||
gain_settings=0,
|
||||
mute=Mute.NOT_MUTED,
|
||||
gain_mode=GainMode.AUTOMATIC,
|
||||
change_counter=0,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_gain_setting_when_gain_mode_MANUAL(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.MANUAL
|
||||
change_counter = 0
|
||||
gain_settings = 120
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_GAIN_SETTING,
|
||||
change_counter,
|
||||
gain_settings,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
assert await aics_client.audio_input_state.read_value() == AudioInputState(
|
||||
gain_settings=gain_settings,
|
||||
mute=Mute.NOT_MUTED,
|
||||
gain_mode=GainMode.MANUAL,
|
||||
change_counter=change_counter,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_gain_setting_when_gain_mode_MANUAL_ONLY(
|
||||
aics_client: AICSServiceProxy,
|
||||
):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
|
||||
change_counter = 0
|
||||
gain_settings = 120
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_GAIN_SETTING,
|
||||
change_counter,
|
||||
gain_settings,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
assert await aics_client.audio_input_state.read_value() == AudioInputState(
|
||||
gain_settings=gain_settings,
|
||||
mute=Mute.NOT_MUTED,
|
||||
gain_mode=GainMode.MANUAL_ONLY,
|
||||
change_counter=change_counter,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unmute_when_muted(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.mute = Mute.MUTED
|
||||
change_counter = 0
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.UNMUTE,
|
||||
change_counter,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
change_counter += 1
|
||||
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.mute == Mute.NOT_MUTED
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unmute_when_mute_disabled(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.mute = Mute.DISABLED
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.UNMUTE,
|
||||
change_counter,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.MUTE_DISABLED
|
||||
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.mute == Mute.DISABLED
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mute_when_not_muted(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.mute = Mute.NOT_MUTED
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.MUTE,
|
||||
change_counter,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
change_counter += 1
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.mute == Mute.MUTED
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mute_when_mute_disabled(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.mute = Mute.DISABLED
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.MUTE,
|
||||
change_counter,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.MUTE_DISABLED
|
||||
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.mute == Mute.DISABLED
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_manual_gain_mode_when_automatic(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
change_counter += 1
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.MANUAL
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_manual_gain_mode_when_already_manual(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.MANUAL
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# No change expected
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.MANUAL
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_manual_gain_mode_when_manual_only(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
|
||||
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.MANUAL_ONLY
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_manual_gain_mode_when_automatic_only(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
|
||||
|
||||
# No change expected
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.AUTOMATIC_ONLY
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_automatic_gain_mode_when_manual(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.MANUAL
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
change_counter += 1
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.AUTOMATIC
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_automatic_gain_mode_when_already_automatic(
|
||||
aics_client: AICSServiceProxy,
|
||||
):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# No change expected
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.AUTOMATIC
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_automatic_gain_mode_when_manual_only(aics_client: AICSServiceProxy):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
|
||||
|
||||
# No change expected
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.MANUAL_ONLY
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_automatic_gain_mode_when_automatic_only(
|
||||
aics_client: AICSServiceProxy,
|
||||
):
|
||||
aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
|
||||
aics_service.audio_input_state.change_counter = 0
|
||||
change_counter = 0
|
||||
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await aics_client.audio_input_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
|
||||
change_counter,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
|
||||
assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
|
||||
|
||||
# No change expected
|
||||
state: AudioInputState = await aics_client.audio_input_state.read_value()
|
||||
assert state.gain_mode == GainMode.AUTOMATIC_ONLY
|
||||
assert state.change_counter == change_counter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_audio_input_description_initial_value(aics_client: AICSServiceProxy):
|
||||
description = await aics_client.audio_input_description.read_value()
|
||||
assert description.decode('utf-8') == "Bluetooth"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_audio_input_description_write_and_read(aics_client: AICSServiceProxy):
|
||||
new_description = "Line Input".encode('utf-8')
|
||||
|
||||
await aics_client.audio_input_description.write_value(new_description)
|
||||
|
||||
description = await aics_client.audio_input_description.read_value()
|
||||
assert description == new_description
|
||||
@@ -47,8 +47,10 @@ from bumble.att import (
|
||||
ATT_EXCHANGE_MTU_REQUEST,
|
||||
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
|
||||
ATT_PDU,
|
||||
ATT_Error,
|
||||
ATT_Error_Response,
|
||||
ATT_Read_By_Group_Type_Request,
|
||||
ErrorCode,
|
||||
)
|
||||
from .test_utils import async_barrier
|
||||
|
||||
@@ -1247,6 +1249,32 @@ async def test_get_characteristics_by_uuid():
|
||||
assert len(s) == 1
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_return_error():
|
||||
[client, server] = LinkedDevices().devices[:2]
|
||||
|
||||
on_write = Mock(side_effect=ATT_Error(error_code=ErrorCode.VALUE_NOT_ALLOWED))
|
||||
characteristic = Characteristic(
|
||||
'1234',
|
||||
Characteristic.Properties.WRITE,
|
||||
Characteristic.Permissions.WRITEABLE,
|
||||
CharacteristicValue(write=on_write),
|
||||
)
|
||||
service = Service('ABCD', [characteristic])
|
||||
server.add_service(service)
|
||||
|
||||
await client.power_on()
|
||||
await server.power_on()
|
||||
connection = await client.connect(server.random_address)
|
||||
|
||||
async with Peer(connection) as peer:
|
||||
c = peer.get_characteristics_by_uuid(uuid=UUID('1234'))[0]
|
||||
with pytest.raises(ATT_Error) as e:
|
||||
await c.write_value(b'', with_response=True)
|
||||
assert e.value.error_code == ErrorCode.VALUE_NOT_ALLOWED
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
|
||||
@@ -0,0 +1,227 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import pytest
|
||||
import functools
|
||||
import pytest_asyncio
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from bumble import att, device
|
||||
from bumble.profiles import hap
|
||||
from .test_utils import TwoDevices
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
foo_preset = hap.PresetRecord(1, "foo preset")
|
||||
bar_preset = hap.PresetRecord(50, "bar preset")
|
||||
foobar_preset = hap.PresetRecord(5, "foobar preset")
|
||||
unavailable_preset = hap.PresetRecord(
|
||||
78,
|
||||
"foobar preset",
|
||||
hap.PresetRecord.Property(
|
||||
hap.PresetRecord.Property.Writable.CANNOT_BE_WRITTEN,
|
||||
hap.PresetRecord.Property.IsAvailable.IS_UNAVAILABLE,
|
||||
),
|
||||
)
|
||||
|
||||
server_features = hap.HearingAidFeatures(
|
||||
hap.HearingAidType.MONAURAL_HEARING_AID,
|
||||
hap.PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED,
|
||||
hap.IndependentPresets.IDENTICAL_PRESET_RECORD,
|
||||
hap.DynamicPresets.PRESET_RECORDS_DOES_NOT_CHANGE,
|
||||
hap.WritablePresetsSupport.WRITABLE_PRESET_RECORDS_SUPPORTED,
|
||||
)
|
||||
|
||||
TIMEOUT = 0.1
|
||||
|
||||
|
||||
async def assert_queue_is_empty(queue: asyncio.Queue):
|
||||
assert queue.empty()
|
||||
|
||||
# Check that nothing is being added during TIMEOUT secondes
|
||||
if sys.version_info >= (3, 11):
|
||||
with pytest.raises(TimeoutError):
|
||||
await asyncio.wait_for(queue.get(), TIMEOUT)
|
||||
else:
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await asyncio.wait_for(queue.get(), TIMEOUT)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest_asyncio.fixture
|
||||
async def hap_client():
|
||||
devices = TwoDevices()
|
||||
devices[0].add_service(
|
||||
hap.HearingAccessService(
|
||||
devices[0],
|
||||
server_features,
|
||||
[foo_preset, bar_preset, foobar_preset, unavailable_preset],
|
||||
)
|
||||
)
|
||||
|
||||
await devices.setup_connection()
|
||||
# TODO negotiate MTU > 49 to not truncate preset names
|
||||
|
||||
# Mock encryption.
|
||||
devices.connections[0].encryption = 1 # type: ignore
|
||||
devices.connections[1].encryption = 1 # type: ignore
|
||||
|
||||
peer = device.Peer(devices.connections[1]) # type: ignore
|
||||
hap_client = await peer.discover_service_and_create_proxy(
|
||||
hap.HearingAccessServiceProxy
|
||||
)
|
||||
assert hap_client
|
||||
await hap_client.setup_subscription()
|
||||
|
||||
yield hap_client
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_service(hap_client: hap.HearingAccessServiceProxy):
|
||||
assert (
|
||||
hap.HearingAidFeatures_from_bytes(await hap_client.server_features.read_value())
|
||||
== server_features
|
||||
)
|
||||
assert (await hap_client.active_preset_index.read_value()) == (foo_preset.index)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_all_presets(hap_client: hap.HearingAccessServiceProxy):
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes([hap.HearingAidPresetControlPointOpcode.READ_PRESETS_REQUEST, 1, 0xFF])
|
||||
)
|
||||
assert (await hap_client.preset_control_point_indications.get()) == bytes(
|
||||
[hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 0]
|
||||
) + bytes(foo_preset)
|
||||
assert (await hap_client.preset_control_point_indications.get()) == bytes(
|
||||
[hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 0]
|
||||
) + bytes(foobar_preset)
|
||||
assert (await hap_client.preset_control_point_indications.get()) == bytes(
|
||||
[hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 0]
|
||||
) + bytes(bar_preset)
|
||||
assert (await hap_client.preset_control_point_indications.get()) == bytes(
|
||||
[hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 1]
|
||||
) + bytes(unavailable_preset)
|
||||
|
||||
await assert_queue_is_empty(hap_client.preset_control_point_indications)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_partial_presets(hap_client: hap.HearingAccessServiceProxy):
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes([hap.HearingAidPresetControlPointOpcode.READ_PRESETS_REQUEST, 3, 2])
|
||||
)
|
||||
assert (await hap_client.preset_control_point_indications.get())[2:] == bytes(
|
||||
foobar_preset
|
||||
)
|
||||
assert (await hap_client.preset_control_point_indications.get())[2:] == bytes(
|
||||
bar_preset
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_active_preset_valid(hap_client: hap.HearingAccessServiceProxy):
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes(
|
||||
[hap.HearingAidPresetControlPointOpcode.SET_ACTIVE_PRESET, bar_preset.index]
|
||||
)
|
||||
)
|
||||
assert (await hap_client.active_preset_index_notification.get()) == bar_preset.index
|
||||
|
||||
assert (await hap_client.active_preset_index.read_value()) == (bar_preset.index)
|
||||
|
||||
await assert_queue_is_empty(hap_client.active_preset_index_notification)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_active_preset_invalid(hap_client: hap.HearingAccessServiceProxy):
|
||||
with pytest.raises(att.ATT_Error) as e:
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes(
|
||||
[
|
||||
hap.HearingAidPresetControlPointOpcode.SET_ACTIVE_PRESET,
|
||||
unavailable_preset.index,
|
||||
]
|
||||
),
|
||||
with_response=True,
|
||||
)
|
||||
assert e.value.error_code == hap.ErrorCode.PRESET_OPERATION_NOT_POSSIBLE
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_next_preset(hap_client: hap.HearingAccessServiceProxy):
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes([hap.HearingAidPresetControlPointOpcode.SET_NEXT_PRESET])
|
||||
)
|
||||
assert (
|
||||
await hap_client.active_preset_index_notification.get()
|
||||
) == foobar_preset.index
|
||||
|
||||
assert (await hap_client.active_preset_index.read_value()) == (foobar_preset.index)
|
||||
|
||||
await assert_queue_is_empty(hap_client.active_preset_index_notification)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_next_preset_will_loop_to_first(
|
||||
hap_client: hap.HearingAccessServiceProxy,
|
||||
):
|
||||
async def go_next(new_preset: hap.PresetRecord):
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes([hap.HearingAidPresetControlPointOpcode.SET_NEXT_PRESET])
|
||||
)
|
||||
assert (
|
||||
await hap_client.active_preset_index_notification.get()
|
||||
) == new_preset.index
|
||||
|
||||
assert (await hap_client.active_preset_index.read_value()) == (new_preset.index)
|
||||
|
||||
await go_next(foobar_preset)
|
||||
await go_next(bar_preset)
|
||||
await go_next(foo_preset)
|
||||
|
||||
# Note that there is a invalid preset in the preset record of the server
|
||||
|
||||
await assert_queue_is_empty(hap_client.active_preset_index_notification)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_previous_preset_will_loop_to_last(
|
||||
hap_client: hap.HearingAccessServiceProxy,
|
||||
):
|
||||
await hap_client.hearing_aid_preset_control_point.write_value(
|
||||
bytes([hap.HearingAidPresetControlPointOpcode.SET_PREVIOUS_PRESET])
|
||||
)
|
||||
assert (await hap_client.active_preset_index_notification.get()) == bar_preset.index
|
||||
|
||||
assert (await hap_client.active_preset_index.read_value()) == (bar_preset.index)
|
||||
|
||||
await assert_queue_is_empty(hap_client.active_preset_index_notification)
|
||||
@@ -60,6 +60,8 @@ from bumble.hci import (
|
||||
HCI_Number_Of_Completed_Packets_Event,
|
||||
HCI_Packet,
|
||||
HCI_PIN_Code_Request_Reply_Command,
|
||||
HCI_Read_Local_Supported_Codecs_Command,
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command,
|
||||
HCI_Read_Local_Supported_Commands_Command,
|
||||
HCI_Read_Local_Supported_Features_Command,
|
||||
HCI_Read_Local_Version_Information_Command,
|
||||
@@ -476,6 +478,51 @@ def test_HCI_LE_Setup_ISO_Data_Path_Command():
|
||||
basic_check(command)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Read_Local_Supported_Codecs_Command_Complete():
|
||||
returned_parameters = (
|
||||
HCI_Read_Local_Supported_Codecs_Command.parse_return_parameters(
|
||||
bytes([HCI_SUCCESS, 3, CodecID.A_LOG, CodecID.CVSD, CodecID.LINEAR_PCM, 0])
|
||||
)
|
||||
)
|
||||
assert returned_parameters.standard_codec_ids == [
|
||||
CodecID.A_LOG,
|
||||
CodecID.CVSD,
|
||||
CodecID.LINEAR_PCM,
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete():
|
||||
returned_parameters = (
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters(
|
||||
bytes(
|
||||
[
|
||||
HCI_SUCCESS,
|
||||
3,
|
||||
CodecID.A_LOG,
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
|
||||
CodecID.CVSD,
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
|
||||
CodecID.LINEAR_PCM,
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
|
||||
0,
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
assert returned_parameters.standard_codec_ids == [
|
||||
CodecID.A_LOG,
|
||||
CodecID.CVSD,
|
||||
CodecID.LINEAR_PCM,
|
||||
]
|
||||
assert returned_parameters.standard_codec_transports == [
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_address():
|
||||
a = Address('C4:F2:17:1A:1D:BB')
|
||||
|
||||
Reference in New Issue
Block a user