mirror of
https://github.com/google/bumble.git
synced 2026-04-17 00:35:31 +00:00
822 lines
30 KiB
Python
822 lines
30 KiB
Python
# Copyright 2023 Google LLC
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# https://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Imports
|
||
# -----------------------------------------------------------------------------
|
||
import collections.abc
|
||
import logging
|
||
import asyncio
|
||
import dataclasses
|
||
import enum
|
||
import traceback
|
||
import warnings
|
||
from typing import Dict, List, Union, Set, TYPE_CHECKING
|
||
|
||
from . import at
|
||
from . import rfcomm
|
||
|
||
from bumble.colors import color
|
||
from bumble.core import (
|
||
ProtocolError,
|
||
BT_GENERIC_AUDIO_SERVICE,
|
||
BT_HANDSFREE_SERVICE,
|
||
BT_L2CAP_PROTOCOL_ID,
|
||
BT_RFCOMM_PROTOCOL_ID,
|
||
)
|
||
from bumble.sdp import (
|
||
DataElement,
|
||
ServiceAttribute,
|
||
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||
)
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Logging
|
||
# -----------------------------------------------------------------------------
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Error
|
||
# -----------------------------------------------------------------------------
|
||
|
||
|
||
class HfpProtocolError(ProtocolError):
|
||
def __init__(self, error_name: str = '', details: str = ''):
|
||
super().__init__(None, 'hfp', error_name, details)
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Protocol Support
|
||
# -----------------------------------------------------------------------------
|
||
|
||
# -----------------------------------------------------------------------------
|
||
class HfpProtocol:
|
||
dlc: rfcomm.DLC
|
||
buffer: str
|
||
lines: collections.deque
|
||
lines_available: asyncio.Event
|
||
|
||
def __init__(self, dlc: rfcomm.DLC) -> None:
|
||
warnings.warn("See HfProtocol", DeprecationWarning)
|
||
self.dlc = dlc
|
||
self.buffer = ''
|
||
self.lines = collections.deque()
|
||
self.lines_available = asyncio.Event()
|
||
|
||
dlc.sink = self.feed
|
||
|
||
def feed(self, data: Union[bytes, str]) -> None:
|
||
# Convert the data to a string if needed
|
||
if isinstance(data, bytes):
|
||
data = data.decode('utf-8')
|
||
|
||
logger.debug(f'<<< Data received: {data}')
|
||
|
||
# Add to the buffer and look for lines
|
||
self.buffer += data
|
||
while (separator := self.buffer.find('\r')) >= 0:
|
||
line = self.buffer[:separator].strip()
|
||
self.buffer = self.buffer[separator + 1 :]
|
||
if len(line) > 0:
|
||
self.on_line(line)
|
||
|
||
def on_line(self, line: str) -> None:
|
||
self.lines.append(line)
|
||
self.lines_available.set()
|
||
|
||
def send_command_line(self, line: str) -> None:
|
||
logger.debug(color(f'>>> {line}', 'yellow'))
|
||
self.dlc.write(line + '\r')
|
||
|
||
def send_response_line(self, line: str) -> None:
|
||
logger.debug(color(f'>>> {line}', 'yellow'))
|
||
self.dlc.write('\r\n' + line + '\r\n')
|
||
|
||
async def next_line(self) -> str:
|
||
await self.lines_available.wait()
|
||
line = self.lines.popleft()
|
||
if not self.lines:
|
||
self.lines_available.clear()
|
||
logger.debug(color(f'<<< {line}', 'green'))
|
||
return line
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Normative protocol definitions
|
||
# -----------------------------------------------------------------------------
|
||
|
||
|
||
# HF supported features (AT+BRSF=) (normative).
|
||
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
|
||
# and 3GPP 27.007
|
||
class HfFeature(enum.IntFlag):
|
||
EC_NR = 0x001 # Echo Cancel & Noise reduction
|
||
THREE_WAY_CALLING = 0x002
|
||
CLI_PRESENTATION_CAPABILITY = 0x004
|
||
VOICE_RECOGNITION_ACTIVATION = 0x008
|
||
REMOTE_VOLUME_CONTROL = 0x010
|
||
ENHANCED_CALL_STATUS = 0x020
|
||
ENHANCED_CALL_CONTROL = 0x040
|
||
CODEC_NEGOTIATION = 0x080
|
||
HF_INDICATORS = 0x100
|
||
ESCO_S4_SETTINGS_SUPPORTED = 0x200
|
||
ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
|
||
VOICE_RECOGNITION_TEST = 0x800
|
||
|
||
|
||
# AG supported features (+BRSF:) (normative).
|
||
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
|
||
# and 3GPP 27.007
|
||
class AgFeature(enum.IntFlag):
|
||
THREE_WAY_CALLING = 0x001
|
||
EC_NR = 0x002 # Echo Cancel & Noise reduction
|
||
VOICE_RECOGNITION_FUNCTION = 0x004
|
||
IN_BAND_RING_TONE_CAPABILITY = 0x008
|
||
VOICE_TAG = 0x010 # Attach a number to voice tag
|
||
REJECT_CALL = 0x020 # Ability to reject a call
|
||
ENHANCED_CALL_STATUS = 0x040
|
||
ENHANCED_CALL_CONTROL = 0x080
|
||
EXTENDED_ERROR_RESULT_CODES = 0x100
|
||
CODEC_NEGOTIATION = 0x200
|
||
HF_INDICATORS = 0x400
|
||
ESCO_S4_SETTINGS_SUPPORTED = 0x800
|
||
ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
|
||
VOICE_RECOGNITION_TEST = 0x2000
|
||
|
||
|
||
# Audio Codec IDs (normative).
|
||
# Hands-Free Profile v1.8, 10 Appendix B
|
||
class AudioCodec(enum.IntEnum):
|
||
CVSD = 0x01 # Support for CVSD audio codec
|
||
MSBC = 0x02 # Support for mSBC audio codec
|
||
|
||
|
||
# HF Indicators (normative).
|
||
# Bluetooth Assigned Numbers, 6.10.1 HF Indicators
|
||
class HfIndicator(enum.IntEnum):
|
||
ENHANCED_SAFETY = 0x01 # Enhanced safety feature
|
||
BATTERY_LEVEL = 0x02 # Battery level feature
|
||
|
||
|
||
# Call Hold supported operations (normative).
|
||
# AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services
|
||
class CallHoldOperation(enum.IntEnum):
|
||
RELEASE_ALL_HELD_CALLS = 0 # Release all held calls
|
||
RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other
|
||
HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other
|
||
ADD_HELD_CALL = 3 # Adds a held call to conversation
|
||
|
||
|
||
# Response Hold status (normative).
|
||
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
|
||
# and 3GPP 27.007
|
||
class ResponseHoldStatus(enum.IntEnum):
|
||
INC_CALL_HELD = 0 # Put incoming call on hold
|
||
HELD_CALL_ACC = 1 # Accept a held incoming call
|
||
HELD_CALL_REJ = 2 # Reject a held incoming call
|
||
|
||
|
||
# Values for the Call Setup AG indicator (normative).
|
||
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
|
||
# and 3GPP 27.007
|
||
class CallSetupAgIndicator(enum.IntEnum):
|
||
NOT_IN_CALL_SETUP = 0
|
||
INCOMING_CALL_PROCESS = 1
|
||
OUTGOING_CALL_SETUP = 2
|
||
REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call
|
||
|
||
|
||
# Values for the Call Held AG indicator (normative).
|
||
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
|
||
# and 3GPP 27.007
|
||
class CallHeldAgIndicator(enum.IntEnum):
|
||
NO_CALLS_HELD = 0
|
||
# Call is placed on hold or active/held calls swapped
|
||
# (The AG has both an active AND a held call)
|
||
CALL_ON_HOLD_AND_ACTIVE_CALL = 1
|
||
CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call
|
||
|
||
|
||
# Call Info direction (normative).
|
||
# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
|
||
class CallInfoDirection(enum.IntEnum):
|
||
MOBILE_ORIGINATED_CALL = 0
|
||
MOBILE_TERMINATED_CALL = 1
|
||
|
||
|
||
# Call Info status (normative).
|
||
# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
|
||
class CallInfoStatus(enum.IntEnum):
|
||
ACTIVE = 0
|
||
HELD = 1
|
||
DIALING = 2
|
||
ALERTING = 3
|
||
INCOMING = 4
|
||
WAITING = 5
|
||
|
||
|
||
# Call Info mode (normative).
|
||
# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
|
||
class CallInfoMode(enum.IntEnum):
|
||
VOICE = 0
|
||
DATA = 1
|
||
FAX = 2
|
||
UNKNOWN = 9
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Hands-Free Control Interoperability Requirements
|
||
# -----------------------------------------------------------------------------
|
||
|
||
# Response codes.
|
||
RESPONSE_CODES = [
|
||
"+APLSIRI",
|
||
"+BAC",
|
||
"+BCC",
|
||
"+BCS",
|
||
"+BIA",
|
||
"+BIEV",
|
||
"+BIND",
|
||
"+BINP",
|
||
"+BLDN",
|
||
"+BRSF",
|
||
"+BTRH",
|
||
"+BVRA",
|
||
"+CCWA",
|
||
"+CHLD",
|
||
"+CHUP",
|
||
"+CIND",
|
||
"+CLCC",
|
||
"+CLIP",
|
||
"+CMEE",
|
||
"+CMER",
|
||
"+CNUM",
|
||
"+COPS",
|
||
"+IPHONEACCEV",
|
||
"+NREC",
|
||
"+VGM",
|
||
"+VGS",
|
||
"+VTS",
|
||
"+XAPL",
|
||
"A",
|
||
"D",
|
||
]
|
||
|
||
# Unsolicited responses and statuses.
|
||
UNSOLICITED_CODES = [
|
||
"+APLSIRI",
|
||
"+BCS",
|
||
"+BIND",
|
||
"+BSIR",
|
||
"+BTRH",
|
||
"+BVRA",
|
||
"+CCWA",
|
||
"+CIEV",
|
||
"+CLIP",
|
||
"+VGM",
|
||
"+VGS",
|
||
"BLACKLISTED",
|
||
"BUSY",
|
||
"DELAYED",
|
||
"NO ANSWER",
|
||
"NO CARRIER",
|
||
"RING",
|
||
]
|
||
|
||
# Status codes
|
||
STATUS_CODES = [
|
||
"+CME ERROR",
|
||
"BLACKLISTED",
|
||
"BUSY",
|
||
"DELAYED",
|
||
"ERROR",
|
||
"NO ANSWER",
|
||
"NO CARRIER",
|
||
"OK",
|
||
]
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class Configuration:
|
||
supported_hf_features: List[HfFeature]
|
||
supported_hf_indicators: List[HfIndicator]
|
||
supported_audio_codecs: List[AudioCodec]
|
||
|
||
|
||
class AtResponseType(enum.Enum):
|
||
"""Indicate if a response is expected from an AT command, and if multiple
|
||
responses are accepted."""
|
||
|
||
NONE = 0
|
||
SINGLE = 1
|
||
MULTIPLE = 2
|
||
|
||
|
||
class AtResponse:
|
||
code: str
|
||
parameters: list
|
||
|
||
def __init__(self, response: bytearray):
|
||
code_and_parameters = response.split(b':')
|
||
parameters = (
|
||
code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray()
|
||
)
|
||
self.code = code_and_parameters[0].decode()
|
||
self.parameters = at.parse_parameters(parameters)
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class AgIndicatorState:
|
||
description: str
|
||
index: int
|
||
supported_values: Set[int]
|
||
current_status: int
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class HfIndicatorState:
|
||
supported: bool = False
|
||
enabled: bool = False
|
||
|
||
|
||
class HfProtocol:
|
||
"""Implementation for the Hands-Free side of the Hands-Free profile.
|
||
Reference specification Hands-Free Profile v1.8"""
|
||
|
||
supported_hf_features: int
|
||
supported_audio_codecs: List[AudioCodec]
|
||
|
||
supported_ag_features: int
|
||
supported_ag_call_hold_operations: List[CallHoldOperation]
|
||
|
||
ag_indicators: List[AgIndicatorState]
|
||
hf_indicators: Dict[HfIndicator, HfIndicatorState]
|
||
|
||
dlc: rfcomm.DLC
|
||
command_lock: asyncio.Lock
|
||
if TYPE_CHECKING:
|
||
response_queue: asyncio.Queue[AtResponse]
|
||
unsolicited_queue: asyncio.Queue[AtResponse]
|
||
else:
|
||
response_queue: asyncio.Queue
|
||
unsolicited_queue: asyncio.Queue
|
||
read_buffer: bytearray
|
||
|
||
def __init__(self, dlc: rfcomm.DLC, configuration: Configuration):
|
||
# Configure internal state.
|
||
self.dlc = dlc
|
||
self.command_lock = asyncio.Lock()
|
||
self.response_queue = asyncio.Queue()
|
||
self.unsolicited_queue = asyncio.Queue()
|
||
self.read_buffer = bytearray()
|
||
|
||
# Build local features.
|
||
self.supported_hf_features = sum(configuration.supported_hf_features)
|
||
self.supported_audio_codecs = configuration.supported_audio_codecs
|
||
|
||
self.hf_indicators = {
|
||
indicator: HfIndicatorState()
|
||
for indicator in configuration.supported_hf_indicators
|
||
}
|
||
|
||
# Clear remote features.
|
||
self.supported_ag_features = 0
|
||
self.supported_ag_call_hold_operations = []
|
||
self.ag_indicators = []
|
||
|
||
# Bind the AT reader to the RFCOMM channel.
|
||
self.dlc.sink = self._read_at
|
||
|
||
def supports_hf_feature(self, feature: HfFeature) -> bool:
|
||
return (self.supported_hf_features & feature) != 0
|
||
|
||
def supports_ag_feature(self, feature: AgFeature) -> bool:
|
||
return (self.supported_ag_features & feature) != 0
|
||
|
||
# Read AT messages from the RFCOMM channel.
|
||
# Enqueue AT commands, responses, unsolicited responses to their
|
||
# respective queues, and set the corresponding event.
|
||
def _read_at(self, data: bytes):
|
||
# Append to the read buffer.
|
||
self.read_buffer.extend(data)
|
||
|
||
# Locate header and trailer.
|
||
header = self.read_buffer.find(b'\r\n')
|
||
trailer = self.read_buffer.find(b'\r\n', header + 2)
|
||
if header == -1 or trailer == -1:
|
||
return
|
||
|
||
# Isolate the AT response code and parameters.
|
||
raw_response = self.read_buffer[header + 2 : trailer]
|
||
response = AtResponse(raw_response)
|
||
logger.debug(f"<<< {raw_response.decode()}")
|
||
|
||
# Consume the response bytes.
|
||
self.read_buffer = self.read_buffer[trailer + 2 :]
|
||
|
||
# Forward the received code to the correct queue.
|
||
if self.command_lock.locked() and (
|
||
response.code in STATUS_CODES or response.code in RESPONSE_CODES
|
||
):
|
||
self.response_queue.put_nowait(response)
|
||
elif response.code in UNSOLICITED_CODES:
|
||
self.unsolicited_queue.put_nowait(response)
|
||
else:
|
||
logger.warning(f"dropping unexpected response with code '{response.code}'")
|
||
|
||
# Send an AT command and wait for the peer response.
|
||
# Wait for the AT responses sent by the peer, to the status code.
|
||
# Raises asyncio.TimeoutError if the status is not received
|
||
# after a timeout (default 1 second).
|
||
# Raises ProtocolError if the status is not OK.
|
||
async def execute_command(
|
||
self,
|
||
cmd: str,
|
||
timeout: float = 1.0,
|
||
response_type: AtResponseType = AtResponseType.NONE,
|
||
) -> Union[None, AtResponse, List[AtResponse]]:
|
||
async with self.command_lock:
|
||
logger.debug(f">>> {cmd}")
|
||
self.dlc.write(cmd + '\r')
|
||
responses: List[AtResponse] = []
|
||
|
||
while True:
|
||
result = await asyncio.wait_for(
|
||
self.response_queue.get(), timeout=timeout
|
||
)
|
||
if result.code == 'OK':
|
||
if response_type == AtResponseType.SINGLE and len(responses) != 1:
|
||
raise HfpProtocolError("NO ANSWER")
|
||
|
||
if response_type == AtResponseType.MULTIPLE:
|
||
return responses
|
||
if response_type == AtResponseType.SINGLE:
|
||
return responses[0]
|
||
return None
|
||
if result.code in STATUS_CODES:
|
||
raise HfpProtocolError(result.code)
|
||
responses.append(result)
|
||
|
||
# 4.2.1 Service Level Connection Initialization.
|
||
async def initiate_slc(self):
|
||
# 4.2.1.1 Supported features exchange
|
||
# First, in the initialization procedure, the HF shall send the
|
||
# AT+BRSF=<HF supported features> command to the AG to both notify
|
||
# the AG of the supported features in the HF, as well as to retrieve the
|
||
# supported features in the AG using the +BRSF result code.
|
||
response = await self.execute_command(
|
||
f"AT+BRSF={self.supported_hf_features}", response_type=AtResponseType.SINGLE
|
||
)
|
||
|
||
self.supported_ag_features = int(response.parameters[0])
|
||
logger.info(f"supported AG features: {self.supported_ag_features}")
|
||
for feature in AgFeature:
|
||
if self.supports_ag_feature(feature):
|
||
logger.info(f" - {feature.name}")
|
||
|
||
# 4.2.1.2 Codec Negotiation
|
||
# Secondly, in the initialization procedure, if the HF supports the
|
||
# Codec Negotiation feature, it shall check if the AT+BRSF command
|
||
# response from the AG has indicated that it supports the Codec
|
||
# Negotiation feature.
|
||
if self.supports_hf_feature(
|
||
HfFeature.CODEC_NEGOTIATION
|
||
) and self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
|
||
# If both the HF and AG do support the Codec Negotiation feature
|
||
# then the HF shall send the AT+BAC=<HF available codecs> command to
|
||
# the AG to notify the AG of the available codecs in the HF.
|
||
codecs = [str(c) for c in self.supported_audio_codecs]
|
||
await self.execute_command(f"AT+BAC={','.join(codecs)}")
|
||
|
||
# 4.2.1.3 AG Indicators
|
||
# After having retrieved the supported features in the AG, the HF shall
|
||
# determine which indicators are supported by the AG, as well as the
|
||
# ordering of the supported indicators. This is because, according to
|
||
# the 3GPP 27.007 specification [2], the AG may support additional
|
||
# indicators not provided for by the Hands-Free Profile, and because the
|
||
# ordering of the indicators is implementation specific. The HF uses
|
||
# the AT+CIND=? Test command to retrieve information about the supported
|
||
# indicators and their ordering.
|
||
response = await self.execute_command(
|
||
"AT+CIND=?", response_type=AtResponseType.SINGLE
|
||
)
|
||
|
||
self.ag_indicators = []
|
||
for index, indicator in enumerate(response.parameters):
|
||
description = indicator[0].decode()
|
||
supported_values = []
|
||
for value in indicator[1]:
|
||
value = value.split(b'-')
|
||
value = [int(v) for v in value]
|
||
value_min = value[0]
|
||
value_max = value[1] if len(value) > 1 else value[0]
|
||
supported_values.extend([v for v in range(value_min, value_max + 1)])
|
||
|
||
self.ag_indicators.append(
|
||
AgIndicatorState(description, index, set(supported_values), 0)
|
||
)
|
||
|
||
# Once the HF has the necessary supported indicator and ordering
|
||
# information, it shall retrieve the current status of the indicators
|
||
# in the AG using the AT+CIND? Read command.
|
||
response = await self.execute_command(
|
||
"AT+CIND?", response_type=AtResponseType.SINGLE
|
||
)
|
||
|
||
for index, indicator in enumerate(response.parameters):
|
||
self.ag_indicators[index].current_status = int(indicator)
|
||
|
||
# After having retrieved the status of the indicators in the AG, the HF
|
||
# shall then enable the "Indicators status update" function in the AG by
|
||
# issuing the AT+CMER command, to which the AG shall respond with OK.
|
||
await self.execute_command("AT+CMER=3,,,1")
|
||
|
||
if self.supports_hf_feature(
|
||
HfFeature.THREE_WAY_CALLING
|
||
) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING):
|
||
# After the HF has enabled the “Indicators status update” function in
|
||
# the AG, and if the “Call waiting and 3-way calling” bit was set in the
|
||
# supported features bitmap by both the HF and the AG, the HF shall
|
||
# issue the AT+CHLD=? test command to retrieve the information about how
|
||
# the call hold and multiparty services are supported in the AG. The HF
|
||
# shall not issue the AT+CHLD=? test command in case either the HF or
|
||
# the AG does not support the "Three-way calling" feature.
|
||
response = await self.execute_command(
|
||
"AT+CHLD=?", response_type=AtResponseType.SINGLE
|
||
)
|
||
|
||
self.supported_ag_call_hold_operations = [
|
||
CallHoldOperation(int(operation))
|
||
for operation in response.parameters[0]
|
||
if not b'x' in operation
|
||
]
|
||
|
||
# 4.2.1.4 HF Indicators
|
||
# If the HF supports the HF indicator feature, it shall check the +BRSF
|
||
# response to see if the AG also supports the HF Indicator feature.
|
||
if self.supports_hf_feature(
|
||
HfFeature.HF_INDICATORS
|
||
) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
|
||
# If both the HF and AG support the HF Indicator feature, then the HF
|
||
# shall send the AT+BIND=<HF supported HF indicators> command to the AG
|
||
# to notify the AG of the supported indicators’ assigned numbers in the
|
||
# HF. The AG shall respond with OK
|
||
indicators = [str(i) for i in self.hf_indicators.keys()]
|
||
await self.execute_command(f"AT+BIND={','.join(indicators)}")
|
||
|
||
# After having provided the AG with the HF indicators it supports,
|
||
# the HF shall send the AT+BIND=? to request HF indicators supported
|
||
# by the AG. The AG shall reply with the +BIND response listing all
|
||
# HF indicators that it supports followed by an OK.
|
||
response = await self.execute_command(
|
||
"AT+BIND=?", response_type=AtResponseType.SINGLE
|
||
)
|
||
|
||
logger.info("supported HF indicators:")
|
||
for indicator in response.parameters[0]:
|
||
indicator = HfIndicator(int(indicator))
|
||
logger.info(f" - {indicator.name}")
|
||
if indicator in self.hf_indicators:
|
||
self.hf_indicators[indicator].supported = True
|
||
|
||
# Once the HF receives the supported HF indicators list from the AG,
|
||
# the HF shall send the AT+BIND? command to determine which HF
|
||
# indicators are enabled. The AG shall respond with one or more
|
||
# +BIND responses. The AG shall terminate the list with OK.
|
||
# (See Section 4.36.1.3).
|
||
responses = await self.execute_command(
|
||
"AT+BIND?", response_type=AtResponseType.MULTIPLE
|
||
)
|
||
|
||
logger.info("enabled HF indicators:")
|
||
for response in responses:
|
||
indicator = HfIndicator(int(response.parameters[0]))
|
||
enabled = int(response.parameters[1]) != 0
|
||
logger.info(f" - {indicator.name}: {enabled}")
|
||
if indicator in self.hf_indicators:
|
||
self.hf_indicators[indicator].enabled = True
|
||
|
||
logger.info("SLC setup completed")
|
||
|
||
# 4.11.2 Audio Connection Setup by HF
|
||
async def setup_audio_connection(self):
|
||
# When the HF triggers the establishment of the Codec Connection it
|
||
# shall send the AT command AT+BCC to the AG. The AG shall respond with
|
||
# OK if it will start the Codec Connection procedure, and with ERROR
|
||
# if it cannot start the Codec Connection procedure.
|
||
await self.execute_command("AT+BCC")
|
||
|
||
# 4.11.3 Codec Connection Setup
|
||
async def setup_codec_connection(self, codec_id: int):
|
||
# The AG shall send a +BCS=<Codec ID> unsolicited response to the HF.
|
||
# The HF shall then respond to the incoming unsolicited response with
|
||
# the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the
|
||
# unsolicited response code as long as the ID is supported.
|
||
# If the received ID is not available, the HF shall respond with
|
||
# AT+BAC with its available codecs.
|
||
if codec_id not in self.supported_audio_codecs:
|
||
codecs = [str(c) for c in self.supported_audio_codecs]
|
||
await self.execute_command(f"AT+BAC={','.join(codecs)}")
|
||
return
|
||
|
||
await self.execute_command(f"AT+BCS={codec_id}")
|
||
|
||
# After sending the OK response, the AG shall open the
|
||
# Synchronous Connection with the settings that are determined by the
|
||
# ID. The HF shall be ready to accept the synchronous connection
|
||
# establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>.
|
||
|
||
logger.info("codec connection setup completed")
|
||
|
||
# 4.13.1 Answer Incoming Call from the HF – In-Band Ringing
|
||
async def answer_incoming_call(self):
|
||
# The user accepts the incoming voice call by using the proper means
|
||
# provided by the HF. The HF shall then send the ATA command
|
||
# (see Section 4.34) to the AG. The AG shall then begin the procedure for
|
||
# accepting the incoming call.
|
||
await self.execute_command("ATA")
|
||
|
||
# 4.14.1 Reject an Incoming Call from the HF
|
||
async def reject_incoming_call(self):
|
||
# The user rejects the incoming call by using the User Interface on the
|
||
# Hands-Free unit. The HF shall then send the AT+CHUP command
|
||
# (see Section 4.34) to the AG. This may happen at any time during the
|
||
# procedures described in Sections 4.13.1 and 4.13.2.
|
||
await self.execute_command("AT+CHUP")
|
||
|
||
# 4.15.1 Terminate a Call Process from the HF
|
||
async def terminate_call(self):
|
||
# The user may abort the ongoing call process using whatever means
|
||
# provided by the Hands-Free unit. The HF shall send AT+CHUP command
|
||
# (see Section 4.34) to the AG, and the AG shall then start the
|
||
# procedure to terminate or interrupt the current call procedure.
|
||
# The AG shall then send the OK indication followed by the +CIEV result
|
||
# code, with the value indicating (call=0).
|
||
await self.execute_command("AT+CHUP")
|
||
|
||
async def update_ag_indicator(self, index: int, value: int):
|
||
self.ag_indicators[index].current_status = value
|
||
logger.info(
|
||
f"AG indicator updated: {self.ag_indicators[index].description}, {value}"
|
||
)
|
||
|
||
async def handle_unsolicited(self):
|
||
"""Handle unsolicited result codes sent by the audio gateway."""
|
||
result = await self.unsolicited_queue.get()
|
||
if result.code == "+BCS":
|
||
await self.setup_codec_connection(int(result.parameters[0]))
|
||
elif result.code == "+CIEV":
|
||
await self.update_ag_indicator(
|
||
int(result.parameters[0]), int(result.parameters[1])
|
||
)
|
||
else:
|
||
logging.info(f"unhandled unsolicited response {result.code}")
|
||
|
||
async def run(self):
|
||
"""Main rountine for the Hands-Free side of the HFP protocol.
|
||
Initiates the service level connection then loops handling
|
||
unsolicited AG responses."""
|
||
|
||
try:
|
||
await self.initiate_slc()
|
||
while True:
|
||
await self.handle_unsolicited()
|
||
except Exception:
|
||
logger.error("HFP-HF protocol failed with the following error:")
|
||
logger.error(traceback.format_exc())
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Normative SDP definitions
|
||
# -----------------------------------------------------------------------------
|
||
|
||
|
||
# Profile version (normative).
|
||
# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
|
||
class ProfileVersion(enum.IntEnum):
|
||
V1_5 = 0x0105
|
||
V1_6 = 0x0106
|
||
V1_7 = 0x0107
|
||
V1_8 = 0x0108
|
||
V1_9 = 0x0109
|
||
|
||
|
||
# HF supported features (normative).
|
||
# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
|
||
class HfSdpFeature(enum.IntFlag):
|
||
EC_NR = 0x01 # Echo Cancel & Noise reduction
|
||
THREE_WAY_CALLING = 0x02
|
||
CLI_PRESENTATION_CAPABILITY = 0x04
|
||
VOICE_RECOGNITION_ACTIVATION = 0x08
|
||
REMOTE_VOLUME_CONTROL = 0x10
|
||
WIDE_BAND = 0x20 # Wide band speech
|
||
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
|
||
VOICE_RECOGNITION_TEST = 0x80
|
||
|
||
|
||
# AG supported features (normative).
|
||
# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
|
||
class AgSdpFeature(enum.IntFlag):
|
||
THREE_WAY_CALLING = 0x01
|
||
EC_NR = 0x02 # Echo Cancel & Noise reduction
|
||
VOICE_RECOGNITION_FUNCTION = 0x04
|
||
IN_BAND_RING_TONE_CAPABILITY = 0x08
|
||
VOICE_TAG = 0x10 # Attach a number to voice tag
|
||
WIDE_BAND = 0x20 # Wide band speech
|
||
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
|
||
VOICE_RECOGNITION_TEST = 0x80
|
||
|
||
|
||
def sdp_records(
|
||
service_record_handle: int, rfcomm_channel: int, configuration: Configuration
|
||
) -> List[ServiceAttribute]:
|
||
"""Generate the SDP record for HFP Hands-Free support.
|
||
The record exposes the features supported in the input configuration,
|
||
and the allocated RFCOMM channel."""
|
||
|
||
hf_supported_features = 0
|
||
|
||
if HfFeature.EC_NR in configuration.supported_hf_features:
|
||
hf_supported_features |= HfSdpFeature.EC_NR
|
||
if HfFeature.THREE_WAY_CALLING in configuration.supported_hf_features:
|
||
hf_supported_features |= HfSdpFeature.THREE_WAY_CALLING
|
||
if HfFeature.CLI_PRESENTATION_CAPABILITY in configuration.supported_hf_features:
|
||
hf_supported_features |= HfSdpFeature.CLI_PRESENTATION_CAPABILITY
|
||
if HfFeature.VOICE_RECOGNITION_ACTIVATION in configuration.supported_hf_features:
|
||
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_ACTIVATION
|
||
if HfFeature.REMOTE_VOLUME_CONTROL in configuration.supported_hf_features:
|
||
hf_supported_features |= HfSdpFeature.REMOTE_VOLUME_CONTROL
|
||
if (
|
||
HfFeature.ENHANCED_VOICE_RECOGNITION_STATUS
|
||
in configuration.supported_hf_features
|
||
):
|
||
hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
|
||
if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features:
|
||
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST
|
||
|
||
if AudioCodec.MSBC in configuration.supported_audio_codecs:
|
||
hf_supported_features |= HfSdpFeature.WIDE_BAND
|
||
|
||
return [
|
||
ServiceAttribute(
|
||
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||
DataElement.unsigned_integer_32(service_record_handle),
|
||
),
|
||
ServiceAttribute(
|
||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||
DataElement.sequence(
|
||
[
|
||
DataElement.uuid(BT_HANDSFREE_SERVICE),
|
||
DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
|
||
]
|
||
),
|
||
),
|
||
ServiceAttribute(
|
||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||
DataElement.sequence(
|
||
[
|
||
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
|
||
DataElement.sequence(
|
||
[
|
||
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
|
||
DataElement.unsigned_integer_8(rfcomm_channel),
|
||
]
|
||
),
|
||
]
|
||
),
|
||
),
|
||
ServiceAttribute(
|
||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||
DataElement.sequence(
|
||
[
|
||
DataElement.sequence(
|
||
[
|
||
DataElement.uuid(BT_HANDSFREE_SERVICE),
|
||
DataElement.unsigned_integer_16(ProfileVersion.V1_8),
|
||
]
|
||
)
|
||
]
|
||
),
|
||
),
|
||
ServiceAttribute(
|
||
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||
DataElement.unsigned_integer_16(hf_supported_features),
|
||
),
|
||
]
|