Merge pull request #475 from zxzxwu/hfp-ag

Add more HFP command suppport
This commit is contained in:
zxzxwu
2024-04-26 12:01:20 +08:00
committed by GitHub
2 changed files with 428 additions and 25 deletions

View File

@@ -204,17 +204,22 @@ class HfIndicator(enum.IntEnum):
BATTERY_LEVEL = 0x02 # Battery level feature
class CallHoldOperation(enum.IntEnum):
class CallHoldOperation(enum.Enum):
"""
Call Hold supported operations (normative).
AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services.
"""
RELEASE_ALL_HELD_CALLS = 0 # Release all held calls
RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other
HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other
ADD_HELD_CALL = 3 # Adds a held call to conversation
RELEASE_ALL_HELD_CALLS = "0" # Release all held calls
RELEASE_ALL_ACTIVE_CALLS = "1" # Release all active calls, accept other
RELEASE_SPECIFIC_CALL = "1x" # Release a specific call X
HOLD_ALL_ACTIVE_CALLS = "2" # Place all active calls on hold, accept other
HOLD_ALL_CALLS_EXCEPT = "2x" # Place all active calls except call X
ADD_HELD_CALL = "3" # Adds a held call to conversation
CONNECT_TWO_CALLS = (
"4" # Connects the two calls and disconnects the subscriber from both calls
)
class ResponseHoldStatus(enum.IntEnum):
@@ -335,10 +340,82 @@ class CallInfo:
status: CallInfoStatus
mode: CallInfoMode
multi_party: CallInfoMultiParty
number: Optional[int] = None
number: Optional[str] = None
type: Optional[int] = None
@dataclasses.dataclass
class CallLineIdentification:
"""
Calling Line Identification notification.
TS 127 007 - V6.8.0, 7.6 Calling line identification presentation +CLIP, but only
number, type and alpha are meaningful in HFP.
Attributes:
number: String type phone number of format specified by `type`.
type: Type of address octet in integer format (refer TS 24.008 [8] subclause
10.5.4.7).
subaddr: String type subaddress of format specified by `satype`.
satype: Type of subaddress octet in integer format (refer TS 24.008 [8]
subclause 10.5.4.8).
alpha: Optional string type alphanumeric representation of number corresponding
to the entry found in phonebook; used character set should be the one selected
with command Select TE Character Set +CSCS.
cli_validity: 0 CLI valid, 1 CLI has been withheld by the originator, 2 CLI is
not available due to interworking problems or limitations of originating
network.
"""
number: str
type: int
subaddr: Optional[str] = None
satype: Optional[int] = None
alpha: Optional[str] = None
cli_validity: Optional[int] = None
@classmethod
def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self:
return cls(
number=parameters[0].decode(),
type=int(parameters[1]),
subaddr=parameters[2].decode() if len(parameters) >= 3 else None,
satype=(
int(parameters[3]) if len(parameters) >= 4 and parameters[3] else None
),
alpha=parameters[4].decode() if len(parameters) >= 5 else None,
cli_validity=(
int(parameters[5]) if len(parameters) >= 6 and parameters[5] else None
),
)
def to_clip_string(self) -> str:
return ','.join(
str(arg) if arg else ''
for arg in [
self.number,
self.type,
self.subaddr,
self.satype,
self.alpha,
self.cli_validity,
]
)
class VoiceRecognitionState(enum.IntEnum):
"""
vrec values provided in AT+BVRA command.
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
DISABLE = 0
ENABLE = 1
# (Enhanced Voice Recognition Status only) HF is ready to accept audio.
ENHANCED_READY = 2
class CmeError(enum.IntEnum):
"""
CME ERROR codes (partial listed).
@@ -359,7 +436,7 @@ class CmeError(enum.IntEnum):
# -----------------------------------------------------------------------------
# Response codes.
RESPONSE_CODES = [
RESPONSE_CODES = {
"+APLSIRI",
"+BAC",
"+BCC",
@@ -390,10 +467,10 @@ RESPONSE_CODES = [
"+XAPL",
"A",
"D",
]
}
# Unsolicited responses and statuses.
UNSOLICITED_CODES = [
UNSOLICITED_CODES = {
"+APLSIRI",
"+BCS",
"+BIND",
@@ -411,10 +488,10 @@ UNSOLICITED_CODES = [
"NO ANSWER",
"NO CARRIER",
"RING",
]
}
# Status codes
STATUS_CODES = [
STATUS_CODES = {
"+CME ERROR",
"BLACKLISTED",
"BUSY",
@@ -423,7 +500,7 @@ STATUS_CODES = [
"NO ANSWER",
"NO CARRIER",
"OK",
]
}
@dataclasses.dataclass
@@ -626,10 +703,25 @@ class HfProtocol(pyee.EventEmitter):
ag_indicator: When AG update their indicators, notify the new state.
Args:
ag_indicator: AgIndicator
speaker_volume: Emitted when AG update speaker volume autonomously.
Args:
volume: Int
microphone_volume: Emitted when AG update microphone volume autonomously.
Args:
volume: Int
microphone_volume: Emitted when AG sends a ringtone request.
Args:
None
cli_notification: Emitted when notify the call metadata on line.
Args:
cli_notification: CallLineIdentification
voice_recognition: Emitted when AG starts voice recognition autonomously.
Args:
vrec: VoiceRecognitionState
"""
class HfLoopTermination(HfpProtocolError): ...
"""Termination signal for run() loop."""
class HfLoopTermination(HfpProtocolError):
"""Termination signal for run() loop."""
supported_hf_features: int
supported_audio_codecs: List[AudioCodec]
@@ -651,7 +743,11 @@ class HfProtocol(pyee.EventEmitter):
read_buffer: bytearray
active_codec: AudioCodec
def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None:
def __init__(
self,
dlc: rfcomm.DLC,
configuration: HfConfiguration,
) -> None:
super().__init__()
# Configure internal state.
@@ -841,7 +937,7 @@ class HfProtocol(pyee.EventEmitter):
if self.supports_hf_feature(
HfFeature.THREE_WAY_CALLING
) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING):
) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
# After the HF has enabled the “Indicators status update” function in
# the AG, and if the “Call waiting and 3-way calling” bit was set in the
# supported features bitmap by both the HF and the AG, the HF shall
@@ -854,9 +950,8 @@ class HfProtocol(pyee.EventEmitter):
)
self.supported_ag_call_hold_operations = [
CallHoldOperation(int(operation))
for operation in response.parameters[0]
if not b'x' in operation
CallHoldOperation(operation.decode())
for operation in response.parameters
]
# 4.2.1.4 HF Indicators
@@ -987,7 +1082,7 @@ class HfProtocol(pyee.EventEmitter):
multi_party=CallInfoMultiParty(int(response.parameters[4])),
)
if len(response.parameters) >= 7:
call_info.number = int(response.parameters[5])
call_info.number = response.parameters[5]
call_info.type = int(response.parameters[6])
calls.append(call_info)
return calls
@@ -1010,6 +1105,21 @@ class HfProtocol(pyee.EventEmitter):
await self.update_ag_indicator(
int(result.parameters[0]), int(result.parameters[1])
)
elif result.code == "+VGS":
self.emit('speaker_volume', int(result.parameters[0]))
elif result.code == "+VGM":
self.emit('microphone_volume', int(result.parameters[0]))
elif result.code == "RING":
self.emit('ring')
elif result.code == "+CLIP":
self.emit(
'cli_notification', CallLineIdentification.parse_from(result.parameters)
)
elif result.code == "+BVRA":
# TODO: Support Enhanced Voice Recognition.
self.emit(
'voice_recognition', VoiceRecognitionState(int(result.parameters[0]))
)
else:
logging.info(f"unhandled unsolicited response {result.code}")
@@ -1050,6 +1160,14 @@ class AgProtocol(pyee.EventEmitter):
answer: Emit when HF sends ATA to answer phone call.
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
dial: Emit when HF sends ATD to dial phone call.
voice_recognition: Emit when HF requests voice recognition state.
Args:
vrec: VoiceRecognitionState
call_hold: Emit when HF requests call hold operation.
Args:
operation: CallHoldOperation
call_index: Optional[int]
"""
supported_hf_features: int
@@ -1066,10 +1184,12 @@ class AgProtocol(pyee.EventEmitter):
read_buffer: bytearray
active_codec: AudioCodec
calls: List[CallInfo]
indicator_report_enabled: bool
inband_ringtone_enabled: bool
cme_error_enabled: bool
cli_notification_enabled: bool
_remained_slc_setup_features: Set[HfFeature]
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
@@ -1079,6 +1199,7 @@ class AgProtocol(pyee.EventEmitter):
self.dlc = dlc
self.read_buffer = bytearray()
self.active_codec = AudioCodec.CVSD
self.calls = []
# Build local features.
self.supported_ag_features = sum(configuration.supported_ag_features)
@@ -1095,6 +1216,7 @@ class AgProtocol(pyee.EventEmitter):
self.supported_audio_codecs = []
self.indicator_report_enabled = False
self.cme_error_enabled = False
self.cli_notification_enabled = False
self.hf_indicators = collections.OrderedDict()
@@ -1168,6 +1290,21 @@ class AgProtocol(pyee.EventEmitter):
self.inband_ringtone_enabled = enabled
self.send_response(f'+BSIR: {1 if enabled else 0}')
def set_speaker_volume(self, level: int) -> None:
"""Reports speaker volume."""
self.send_response(f'+VGS: {level}')
def set_microphone_volume(self, level: int) -> None:
"""Reports microphone volume."""
self.send_response(f'+VGM: {level}')
def send_ring(self) -> None:
"""Sends RING command to trigger ringtone on HF."""
self.send_response('RING')
def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
"""Updates AG indicator.
@@ -1212,6 +1349,14 @@ class AgProtocol(pyee.EventEmitter):
if (new_codec := await at_bcs_future) != codec:
raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
def send_cli_notification(self, cli: CallLineIdentification) -> None:
"""Sends +CLIP CLI notification."""
if not self.cli_notification_enabled:
logger.warning('Try to send CLIP while CLI notification is not enabled')
self.send_response(f'+CLIP: {cli.to_clip_string()}')
def _check_remained_slc_commands(self) -> None:
if not self._remained_slc_setup_features:
self.emit('slc_complete')
@@ -1240,6 +1385,52 @@ class AgProtocol(pyee.EventEmitter):
self.send_ok()
self.emit('codec_negotiation', self.active_codec)
def _on_bvra(self, vrec: bytes) -> None:
self.send_ok()
self.emit('voice_recognition', VoiceRecognitionState(int(vrec)))
def _on_chld(self, operation_code: bytes) -> None:
call_index: Optional[int] = None
if len(operation_code) > 1:
call_index = int(operation_code[1:])
operation_code = operation_code[:1] + b'x'
try:
operation = CallHoldOperation(operation_code.decode())
except:
logger.error(f'Invalid operation: {operation_code.decode()}')
self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED)
return
if operation not in self.supported_ag_call_hold_operations:
logger.error(f'Unsupported operation: {operation_code.decode()}')
self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED)
if call_index is not None and not any(
call.index == call_index for call in self.calls
):
logger.error(f'No matching call {call_index}')
self.send_cme_error(CmeError.INVALID_INDEX)
# Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :)
self.send_ok()
self.emit('call_hold', operation, call_index)
def _on_chld_test(self) -> None:
if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
self.send_error()
return
self.send_response(
'+CHLD:'
+ ','.join(
operation.value for operation in self.supported_ag_call_hold_operations
)
)
self.send_ok()
self._remained_slc_setup_features.remove(HfFeature.THREE_WAY_CALLING)
self._check_remained_slc_commands()
def _on_cind_test(self) -> None:
if not self.ag_indicators:
self.send_cme_error(CmeError.NOT_FOUND)
@@ -1364,6 +1555,26 @@ class AgProtocol(pyee.EventEmitter):
self.emit('hang_up')
self.send_ok()
def _on_clcc(self) -> None:
for call in self.calls:
response = (
f'+CLCC: {call.index}'
f',{call.direction.value}'
f',{call.status.value}'
f',{call.mode.value}'
f',{call.multi_party.value}'
f',\"{call.number}\"'
if call.number is not None
else '' f',{call.type}' if call.type is not None else ''
)
self.send_response(response)
self.send_ok()
def _on_clip(self, enabled: bytes) -> None:
if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY):
logger.error('Remote doesn not support CLI but sends AT+CLIP')
self.cli_notification_enabled = True if enabled == b'1' else False
# -----------------------------------------------------------------------------
# Normative SDP definitions
@@ -1596,7 +1807,7 @@ async def find_hf_sdp_record(
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
Tuple of (<RFCOMM channel>, <Profile Version>, <HF SDP features>)
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
@@ -1640,7 +1851,7 @@ async def find_ag_sdp_record(
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
Tuple of (<RFCOMM channel>, <Profile Version>, <AG SDP features>)
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(