mirror of
https://github.com/google/bumble.git
synced 2026-05-08 03:58:01 +00:00
Add more HFP command suppport
* Support all Call Hold Operation * Support CLI Presentation * Support Voice Recognition * Support RING and Volume Changes * [AG] Support Enhanced Call Status * Minor fixes
This commit is contained in:
255
bumble/hfp.py
255
bumble/hfp.py
@@ -204,17 +204,22 @@ class HfIndicator(enum.IntEnum):
|
|||||||
BATTERY_LEVEL = 0x02 # Battery level feature
|
BATTERY_LEVEL = 0x02 # Battery level feature
|
||||||
|
|
||||||
|
|
||||||
class CallHoldOperation(enum.IntEnum):
|
class CallHoldOperation(enum.Enum):
|
||||||
"""
|
"""
|
||||||
Call Hold supported operations (normative).
|
Call Hold supported operations (normative).
|
||||||
|
|
||||||
AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services.
|
AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
RELEASE_ALL_HELD_CALLS = 0 # Release all held calls
|
RELEASE_ALL_HELD_CALLS = "0" # Release all held calls
|
||||||
RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other
|
RELEASE_ALL_ACTIVE_CALLS = "1" # Release all active calls, accept other
|
||||||
HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other
|
RELEASE_SPECIFIC_CALL = "1x" # Release a specific call X
|
||||||
ADD_HELD_CALL = 3 # Adds a held call to conversation
|
HOLD_ALL_ACTIVE_CALLS = "2" # Place all active calls on hold, accept other
|
||||||
|
HOLD_ALL_CALLS_EXCEPT = "2x" # Place all active calls except call X
|
||||||
|
ADD_HELD_CALL = "3" # Adds a held call to conversation
|
||||||
|
CONNECT_TWO_CALLS = (
|
||||||
|
"4" # Connects the two calls and disconnects the subscriber from both calls
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ResponseHoldStatus(enum.IntEnum):
|
class ResponseHoldStatus(enum.IntEnum):
|
||||||
@@ -335,10 +340,82 @@ class CallInfo:
|
|||||||
status: CallInfoStatus
|
status: CallInfoStatus
|
||||||
mode: CallInfoMode
|
mode: CallInfoMode
|
||||||
multi_party: CallInfoMultiParty
|
multi_party: CallInfoMultiParty
|
||||||
number: Optional[int] = None
|
number: Optional[str] = None
|
||||||
type: Optional[int] = None
|
type: Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class CallLineIdentification:
|
||||||
|
"""
|
||||||
|
Calling Line Identification notification.
|
||||||
|
|
||||||
|
TS 127 007 - V6.8.0, 7.6 Calling line identification presentation +CLIP, but only
|
||||||
|
number, type and alpha are meaningful in HFP.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
number: String type phone number of format specified by `type`.
|
||||||
|
type: Type of address octet in integer format (refer TS 24.008 [8] subclause
|
||||||
|
10.5.4.7).
|
||||||
|
subaddr: String type subaddress of format specified by `satype`.
|
||||||
|
satype: Type of subaddress octet in integer format (refer TS 24.008 [8]
|
||||||
|
subclause 10.5.4.8).
|
||||||
|
alpha: Optional string type alphanumeric representation of number corresponding
|
||||||
|
to the entry found in phonebook; used character set should be the one selected
|
||||||
|
with command Select TE Character Set +CSCS.
|
||||||
|
cli_validity: 0 CLI valid, 1 CLI has been withheld by the originator, 2 CLI is
|
||||||
|
not available due to interworking problems or limitations of originating
|
||||||
|
network.
|
||||||
|
"""
|
||||||
|
|
||||||
|
number: str
|
||||||
|
type: int
|
||||||
|
subaddr: Optional[str] = None
|
||||||
|
satype: Optional[int] = None
|
||||||
|
alpha: Optional[str] = None
|
||||||
|
cli_validity: Optional[int] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self:
|
||||||
|
return cls(
|
||||||
|
number=parameters[0].decode(),
|
||||||
|
type=int(parameters[1]),
|
||||||
|
subaddr=parameters[2].decode() if len(parameters) >= 3 else None,
|
||||||
|
satype=(
|
||||||
|
int(parameters[3]) if len(parameters) >= 4 and parameters[3] else None
|
||||||
|
),
|
||||||
|
alpha=parameters[4].decode() if len(parameters) >= 5 else None,
|
||||||
|
cli_validity=(
|
||||||
|
int(parameters[5]) if len(parameters) >= 6 and parameters[5] else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def to_clip_string(self) -> str:
|
||||||
|
return ','.join(
|
||||||
|
str(arg) if arg else ''
|
||||||
|
for arg in [
|
||||||
|
self.number,
|
||||||
|
self.type,
|
||||||
|
self.subaddr,
|
||||||
|
self.satype,
|
||||||
|
self.alpha,
|
||||||
|
self.cli_validity,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class VoiceRecognitionState(enum.IntEnum):
|
||||||
|
"""
|
||||||
|
vrec values provided in AT+BVRA command.
|
||||||
|
|
||||||
|
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
|
||||||
|
"""
|
||||||
|
|
||||||
|
DISABLE = 0
|
||||||
|
ENABLE = 1
|
||||||
|
# (Enhanced Voice Recognition Status only) HF is ready to accept audio.
|
||||||
|
ENHANCED_READY = 2
|
||||||
|
|
||||||
|
|
||||||
class CmeError(enum.IntEnum):
|
class CmeError(enum.IntEnum):
|
||||||
"""
|
"""
|
||||||
CME ERROR codes (partial listed).
|
CME ERROR codes (partial listed).
|
||||||
@@ -359,7 +436,7 @@ class CmeError(enum.IntEnum):
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
# Response codes.
|
# Response codes.
|
||||||
RESPONSE_CODES = [
|
RESPONSE_CODES = {
|
||||||
"+APLSIRI",
|
"+APLSIRI",
|
||||||
"+BAC",
|
"+BAC",
|
||||||
"+BCC",
|
"+BCC",
|
||||||
@@ -390,10 +467,10 @@ RESPONSE_CODES = [
|
|||||||
"+XAPL",
|
"+XAPL",
|
||||||
"A",
|
"A",
|
||||||
"D",
|
"D",
|
||||||
]
|
}
|
||||||
|
|
||||||
# Unsolicited responses and statuses.
|
# Unsolicited responses and statuses.
|
||||||
UNSOLICITED_CODES = [
|
UNSOLICITED_CODES = {
|
||||||
"+APLSIRI",
|
"+APLSIRI",
|
||||||
"+BCS",
|
"+BCS",
|
||||||
"+BIND",
|
"+BIND",
|
||||||
@@ -411,10 +488,10 @@ UNSOLICITED_CODES = [
|
|||||||
"NO ANSWER",
|
"NO ANSWER",
|
||||||
"NO CARRIER",
|
"NO CARRIER",
|
||||||
"RING",
|
"RING",
|
||||||
]
|
}
|
||||||
|
|
||||||
# Status codes
|
# Status codes
|
||||||
STATUS_CODES = [
|
STATUS_CODES = {
|
||||||
"+CME ERROR",
|
"+CME ERROR",
|
||||||
"BLACKLISTED",
|
"BLACKLISTED",
|
||||||
"BUSY",
|
"BUSY",
|
||||||
@@ -423,7 +500,7 @@ STATUS_CODES = [
|
|||||||
"NO ANSWER",
|
"NO ANSWER",
|
||||||
"NO CARRIER",
|
"NO CARRIER",
|
||||||
"OK",
|
"OK",
|
||||||
]
|
}
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
@@ -626,10 +703,25 @@ class HfProtocol(pyee.EventEmitter):
|
|||||||
ag_indicator: When AG update their indicators, notify the new state.
|
ag_indicator: When AG update their indicators, notify the new state.
|
||||||
Args:
|
Args:
|
||||||
ag_indicator: AgIndicator
|
ag_indicator: AgIndicator
|
||||||
|
speaker_volume: Emitted when AG update speaker volume autonomously.
|
||||||
|
Args:
|
||||||
|
volume: Int
|
||||||
|
microphone_volume: Emitted when AG update microphone volume autonomously.
|
||||||
|
Args:
|
||||||
|
volume: Int
|
||||||
|
microphone_volume: Emitted when AG sends a ringtone request.
|
||||||
|
Args:
|
||||||
|
None
|
||||||
|
cli_notification: Emitted when notify the call metadata on line.
|
||||||
|
Args:
|
||||||
|
cli_notification: CallLineIdentification
|
||||||
|
voice_recognition: Emitted when AG starts voice recognition autonomously.
|
||||||
|
Args:
|
||||||
|
vrec: VoiceRecognitionState
|
||||||
"""
|
"""
|
||||||
|
|
||||||
class HfLoopTermination(HfpProtocolError): ...
|
class HfLoopTermination(HfpProtocolError):
|
||||||
"""Termination signal for run() loop."""
|
"""Termination signal for run() loop."""
|
||||||
|
|
||||||
supported_hf_features: int
|
supported_hf_features: int
|
||||||
supported_audio_codecs: List[AudioCodec]
|
supported_audio_codecs: List[AudioCodec]
|
||||||
@@ -651,7 +743,11 @@ class HfProtocol(pyee.EventEmitter):
|
|||||||
read_buffer: bytearray
|
read_buffer: bytearray
|
||||||
active_codec: AudioCodec
|
active_codec: AudioCodec
|
||||||
|
|
||||||
def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
dlc: rfcomm.DLC,
|
||||||
|
configuration: HfConfiguration,
|
||||||
|
) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
# Configure internal state.
|
# Configure internal state.
|
||||||
@@ -841,7 +937,7 @@ class HfProtocol(pyee.EventEmitter):
|
|||||||
|
|
||||||
if self.supports_hf_feature(
|
if self.supports_hf_feature(
|
||||||
HfFeature.THREE_WAY_CALLING
|
HfFeature.THREE_WAY_CALLING
|
||||||
) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING):
|
) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
|
||||||
# After the HF has enabled the “Indicators status update” function in
|
# After the HF has enabled the “Indicators status update” function in
|
||||||
# the AG, and if the “Call waiting and 3-way calling” bit was set in the
|
# the AG, and if the “Call waiting and 3-way calling” bit was set in the
|
||||||
# supported features bitmap by both the HF and the AG, the HF shall
|
# supported features bitmap by both the HF and the AG, the HF shall
|
||||||
@@ -854,9 +950,8 @@ class HfProtocol(pyee.EventEmitter):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.supported_ag_call_hold_operations = [
|
self.supported_ag_call_hold_operations = [
|
||||||
CallHoldOperation(int(operation))
|
CallHoldOperation(operation.decode())
|
||||||
for operation in response.parameters[0]
|
for operation in response.parameters
|
||||||
if not b'x' in operation
|
|
||||||
]
|
]
|
||||||
|
|
||||||
# 4.2.1.4 HF Indicators
|
# 4.2.1.4 HF Indicators
|
||||||
@@ -987,7 +1082,7 @@ class HfProtocol(pyee.EventEmitter):
|
|||||||
multi_party=CallInfoMultiParty(int(response.parameters[4])),
|
multi_party=CallInfoMultiParty(int(response.parameters[4])),
|
||||||
)
|
)
|
||||||
if len(response.parameters) >= 7:
|
if len(response.parameters) >= 7:
|
||||||
call_info.number = int(response.parameters[5])
|
call_info.number = response.parameters[5]
|
||||||
call_info.type = int(response.parameters[6])
|
call_info.type = int(response.parameters[6])
|
||||||
calls.append(call_info)
|
calls.append(call_info)
|
||||||
return calls
|
return calls
|
||||||
@@ -1010,6 +1105,21 @@ class HfProtocol(pyee.EventEmitter):
|
|||||||
await self.update_ag_indicator(
|
await self.update_ag_indicator(
|
||||||
int(result.parameters[0]), int(result.parameters[1])
|
int(result.parameters[0]), int(result.parameters[1])
|
||||||
)
|
)
|
||||||
|
elif result.code == "+VGS":
|
||||||
|
self.emit('speaker_volume', int(result.parameters[0]))
|
||||||
|
elif result.code == "+VGM":
|
||||||
|
self.emit('microphone_volume', int(result.parameters[0]))
|
||||||
|
elif result.code == "RING":
|
||||||
|
self.emit('ring')
|
||||||
|
elif result.code == "+CLIP":
|
||||||
|
self.emit(
|
||||||
|
'cli_notification', CallLineIdentification.parse_from(result.parameters)
|
||||||
|
)
|
||||||
|
elif result.code == "+BVRA":
|
||||||
|
# TODO: Support Enhanced Voice Recognition.
|
||||||
|
self.emit(
|
||||||
|
'voice_recognition', VoiceRecognitionState(int(result.parameters[0]))
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logging.info(f"unhandled unsolicited response {result.code}")
|
logging.info(f"unhandled unsolicited response {result.code}")
|
||||||
|
|
||||||
@@ -1050,6 +1160,14 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
answer: Emit when HF sends ATA to answer phone call.
|
answer: Emit when HF sends ATA to answer phone call.
|
||||||
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
|
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
|
||||||
dial: Emit when HF sends ATD to dial phone call.
|
dial: Emit when HF sends ATD to dial phone call.
|
||||||
|
voice_recognition: Emit when HF requests voice recognition state.
|
||||||
|
Args:
|
||||||
|
vrec: VoiceRecognitionState
|
||||||
|
call_hold: Emit when HF requests call hold operation.
|
||||||
|
Args:
|
||||||
|
operation: CallHoldOperation
|
||||||
|
call_index: Optional[int]
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
supported_hf_features: int
|
supported_hf_features: int
|
||||||
@@ -1066,10 +1184,12 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
|
|
||||||
read_buffer: bytearray
|
read_buffer: bytearray
|
||||||
active_codec: AudioCodec
|
active_codec: AudioCodec
|
||||||
|
calls: List[CallInfo]
|
||||||
|
|
||||||
indicator_report_enabled: bool
|
indicator_report_enabled: bool
|
||||||
inband_ringtone_enabled: bool
|
inband_ringtone_enabled: bool
|
||||||
cme_error_enabled: bool
|
cme_error_enabled: bool
|
||||||
|
cli_notification_enabled: bool
|
||||||
_remained_slc_setup_features: Set[HfFeature]
|
_remained_slc_setup_features: Set[HfFeature]
|
||||||
|
|
||||||
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
|
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
|
||||||
@@ -1079,6 +1199,7 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
self.dlc = dlc
|
self.dlc = dlc
|
||||||
self.read_buffer = bytearray()
|
self.read_buffer = bytearray()
|
||||||
self.active_codec = AudioCodec.CVSD
|
self.active_codec = AudioCodec.CVSD
|
||||||
|
self.calls = []
|
||||||
|
|
||||||
# Build local features.
|
# Build local features.
|
||||||
self.supported_ag_features = sum(configuration.supported_ag_features)
|
self.supported_ag_features = sum(configuration.supported_ag_features)
|
||||||
@@ -1095,6 +1216,7 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
self.supported_audio_codecs = []
|
self.supported_audio_codecs = []
|
||||||
self.indicator_report_enabled = False
|
self.indicator_report_enabled = False
|
||||||
self.cme_error_enabled = False
|
self.cme_error_enabled = False
|
||||||
|
self.cli_notification_enabled = False
|
||||||
|
|
||||||
self.hf_indicators = collections.OrderedDict()
|
self.hf_indicators = collections.OrderedDict()
|
||||||
|
|
||||||
@@ -1168,6 +1290,21 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
self.inband_ringtone_enabled = enabled
|
self.inband_ringtone_enabled = enabled
|
||||||
self.send_response(f'+BSIR: {1 if enabled else 0}')
|
self.send_response(f'+BSIR: {1 if enabled else 0}')
|
||||||
|
|
||||||
|
def set_speaker_volume(self, level: int) -> None:
|
||||||
|
"""Reports speaker volume."""
|
||||||
|
|
||||||
|
self.send_response(f'+VGS: {level}')
|
||||||
|
|
||||||
|
def set_microphone_volume(self, level: int) -> None:
|
||||||
|
"""Reports microphone volume."""
|
||||||
|
|
||||||
|
self.send_response(f'+VGM: {level}')
|
||||||
|
|
||||||
|
def send_ring(self) -> None:
|
||||||
|
"""Sends RING command to trigger ringtone on HF."""
|
||||||
|
|
||||||
|
self.send_response('RING')
|
||||||
|
|
||||||
def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
|
def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
|
||||||
"""Updates AG indicator.
|
"""Updates AG indicator.
|
||||||
|
|
||||||
@@ -1212,6 +1349,14 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
if (new_codec := await at_bcs_future) != codec:
|
if (new_codec := await at_bcs_future) != codec:
|
||||||
raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
|
raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
|
||||||
|
|
||||||
|
def send_cli_notification(self, cli: CallLineIdentification) -> None:
|
||||||
|
"""Sends +CLIP CLI notification."""
|
||||||
|
|
||||||
|
if not self.cli_notification_enabled:
|
||||||
|
logger.warning('Try to send CLIP while CLI notification is not enabled')
|
||||||
|
|
||||||
|
self.send_response(f'+CLIP: {cli.to_clip_string()}')
|
||||||
|
|
||||||
def _check_remained_slc_commands(self) -> None:
|
def _check_remained_slc_commands(self) -> None:
|
||||||
if not self._remained_slc_setup_features:
|
if not self._remained_slc_setup_features:
|
||||||
self.emit('slc_complete')
|
self.emit('slc_complete')
|
||||||
@@ -1240,6 +1385,52 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
self.send_ok()
|
self.send_ok()
|
||||||
self.emit('codec_negotiation', self.active_codec)
|
self.emit('codec_negotiation', self.active_codec)
|
||||||
|
|
||||||
|
def _on_bvra(self, vrec: bytes) -> None:
|
||||||
|
self.send_ok()
|
||||||
|
self.emit('voice_recognition', VoiceRecognitionState(int(vrec)))
|
||||||
|
|
||||||
|
def _on_chld(self, operation_code: bytes) -> None:
|
||||||
|
call_index: Optional[int] = None
|
||||||
|
if len(operation_code) > 1:
|
||||||
|
call_index = int(operation_code[1:])
|
||||||
|
operation_code = operation_code[:1] + b'x'
|
||||||
|
try:
|
||||||
|
operation = CallHoldOperation(operation_code.decode())
|
||||||
|
except:
|
||||||
|
logger.error(f'Invalid operation: {operation_code.decode()}')
|
||||||
|
self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED)
|
||||||
|
return
|
||||||
|
|
||||||
|
if operation not in self.supported_ag_call_hold_operations:
|
||||||
|
logger.error(f'Unsupported operation: {operation_code.decode()}')
|
||||||
|
self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED)
|
||||||
|
|
||||||
|
if call_index is not None and not any(
|
||||||
|
call.index == call_index for call in self.calls
|
||||||
|
):
|
||||||
|
logger.error(f'No matching call {call_index}')
|
||||||
|
self.send_cme_error(CmeError.INVALID_INDEX)
|
||||||
|
|
||||||
|
# Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :)
|
||||||
|
|
||||||
|
self.send_ok()
|
||||||
|
self.emit('call_hold', operation, call_index)
|
||||||
|
|
||||||
|
def _on_chld_test(self) -> None:
|
||||||
|
if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
|
||||||
|
self.send_error()
|
||||||
|
return
|
||||||
|
|
||||||
|
self.send_response(
|
||||||
|
'+CHLD:'
|
||||||
|
+ ','.join(
|
||||||
|
operation.value for operation in self.supported_ag_call_hold_operations
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.send_ok()
|
||||||
|
self._remained_slc_setup_features.remove(HfFeature.THREE_WAY_CALLING)
|
||||||
|
self._check_remained_slc_commands()
|
||||||
|
|
||||||
def _on_cind_test(self) -> None:
|
def _on_cind_test(self) -> None:
|
||||||
if not self.ag_indicators:
|
if not self.ag_indicators:
|
||||||
self.send_cme_error(CmeError.NOT_FOUND)
|
self.send_cme_error(CmeError.NOT_FOUND)
|
||||||
@@ -1364,6 +1555,26 @@ class AgProtocol(pyee.EventEmitter):
|
|||||||
self.emit('hang_up')
|
self.emit('hang_up')
|
||||||
self.send_ok()
|
self.send_ok()
|
||||||
|
|
||||||
|
def _on_clcc(self) -> None:
|
||||||
|
for call in self.calls:
|
||||||
|
response = (
|
||||||
|
f'+CLCC: {call.index}'
|
||||||
|
f',{call.direction.value}'
|
||||||
|
f',{call.status.value}'
|
||||||
|
f',{call.mode.value}'
|
||||||
|
f',{call.multi_party.value}'
|
||||||
|
f',\"{call.number}\"'
|
||||||
|
if call.number is not None
|
||||||
|
else '' f',{call.type}' if call.type is not None else ''
|
||||||
|
)
|
||||||
|
self.send_response(response)
|
||||||
|
self.send_ok()
|
||||||
|
|
||||||
|
def _on_clip(self, enabled: bytes) -> None:
|
||||||
|
if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY):
|
||||||
|
logger.error('Remote doesn not support CLI but sends AT+CLIP')
|
||||||
|
self.cli_notification_enabled = True if enabled == b'1' else False
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Normative SDP definitions
|
# Normative SDP definitions
|
||||||
@@ -1596,7 +1807,7 @@ async def find_hf_sdp_record(
|
|||||||
connection: ACL connection to make SDP search.
|
connection: ACL connection to make SDP search.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary mapping from channel number to service class UUID list.
|
Tuple of (<RFCOMM channel>, <Profile Version>, <HF SDP features>)
|
||||||
"""
|
"""
|
||||||
async with sdp.Client(connection) as sdp_client:
|
async with sdp.Client(connection) as sdp_client:
|
||||||
search_result = await sdp_client.search_attributes(
|
search_result = await sdp_client.search_attributes(
|
||||||
@@ -1640,7 +1851,7 @@ async def find_ag_sdp_record(
|
|||||||
connection: ACL connection to make SDP search.
|
connection: ACL connection to make SDP search.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary mapping from channel number to service class UUID list.
|
Tuple of (<RFCOMM channel>, <Profile Version>, <AG SDP features>)
|
||||||
"""
|
"""
|
||||||
async with sdp.Client(connection) as sdp_client:
|
async with sdp.Client(connection) as sdp_client:
|
||||||
search_result = await sdp_client.search_attributes(
|
search_result = await sdp_client.search_attributes(
|
||||||
|
|||||||
@@ -43,6 +43,9 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
|
|||||||
hfp.HfFeature.CODEC_NEGOTIATION,
|
hfp.HfFeature.CODEC_NEGOTIATION,
|
||||||
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||||
hfp.HfFeature.HF_INDICATORS,
|
hfp.HfFeature.HF_INDICATORS,
|
||||||
|
hfp.HfFeature.ENHANCED_CALL_STATUS,
|
||||||
|
hfp.HfFeature.THREE_WAY_CALLING,
|
||||||
|
hfp.HfFeature.CLI_PRESENTATION_CAPABILITY,
|
||||||
],
|
],
|
||||||
supported_hf_indicators=[
|
supported_hf_indicators=[
|
||||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||||
@@ -57,7 +60,11 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
|
|||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
|
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
|
||||||
return hfp.HfSdpFeature.WIDE_BAND
|
return (
|
||||||
|
hfp.HfSdpFeature.WIDE_BAND
|
||||||
|
| hfp.HfSdpFeature.THREE_WAY_CALLING
|
||||||
|
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -69,6 +76,8 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
|
|||||||
hfp.AgFeature.REJECT_CALL,
|
hfp.AgFeature.REJECT_CALL,
|
||||||
hfp.AgFeature.CODEC_NEGOTIATION,
|
hfp.AgFeature.CODEC_NEGOTIATION,
|
||||||
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||||
|
hfp.AgFeature.ENHANCED_CALL_STATUS,
|
||||||
|
hfp.AgFeature.THREE_WAY_CALLING,
|
||||||
],
|
],
|
||||||
supported_ag_indicators=[
|
supported_ag_indicators=[
|
||||||
hfp.AgIndicatorState.call(),
|
hfp.AgIndicatorState.call(),
|
||||||
@@ -83,14 +92,26 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
|
|||||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||||
hfp.HfIndicator.BATTERY_LEVEL,
|
hfp.HfIndicator.BATTERY_LEVEL,
|
||||||
],
|
],
|
||||||
supported_ag_call_hold_operations=[],
|
supported_ag_call_hold_operations=[
|
||||||
|
hfp.CallHoldOperation.ADD_HELD_CALL,
|
||||||
|
hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
|
||||||
|
hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
|
||||||
|
hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
|
||||||
|
hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
|
||||||
|
hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
|
||||||
|
hfp.CallHoldOperation.CONNECT_TWO_CALLS,
|
||||||
|
],
|
||||||
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
|
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
|
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
|
||||||
return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
|
return (
|
||||||
|
hfp.AgSdpFeature.WIDE_BAND
|
||||||
|
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
|
||||||
|
| hfp.AgSdpFeature.THREE_WAY_CALLING
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -165,6 +186,7 @@ async def test_slc_with_minimal_features():
|
|||||||
|
|
||||||
assert hf.supported_ag_features == ag.supported_ag_features
|
assert hf.supported_ag_features == ag.supported_ag_features
|
||||||
assert hf.supported_hf_features == ag.supported_hf_features
|
assert hf.supported_hf_features == ag.supported_hf_features
|
||||||
|
assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
|
||||||
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
||||||
assert a.indicator == b.indicator
|
assert a.indicator == b.indicator
|
||||||
assert a.current_status == b.current_status
|
assert a.current_status == b.current_status
|
||||||
@@ -177,6 +199,7 @@ async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
|||||||
|
|
||||||
assert hf.supported_ag_features == ag.supported_ag_features
|
assert hf.supported_ag_features == ag.supported_ag_features
|
||||||
assert hf.supported_hf_features == ag.supported_hf_features
|
assert hf.supported_hf_features == ag.supported_hf_features
|
||||||
|
assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
|
||||||
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
||||||
assert a.indicator == b.indicator
|
assert a.indicator == b.indicator
|
||||||
assert a.current_status == b.current_status
|
assert a.current_status == b.current_status
|
||||||
@@ -281,6 +304,175 @@ async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProto
|
|||||||
await future
|
await future
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_query_calls_without_calls(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
|
||||||
|
await hf.query_current_calls() == []
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_query_calls_with_calls(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
ag.calls.append(
|
||||||
|
hfp.CallInfo(
|
||||||
|
index=1,
|
||||||
|
direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
|
||||||
|
status=hfp.CallInfoStatus.ACTIVE,
|
||||||
|
mode=hfp.CallInfoMode.VOICE,
|
||||||
|
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
|
||||||
|
number='123456789',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
await hf.query_current_calls() == ag.calls
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operation,",
|
||||||
|
(
|
||||||
|
hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
|
||||||
|
hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
|
||||||
|
hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
|
||||||
|
hfp.CallHoldOperation.ADD_HELD_CALL,
|
||||||
|
hfp.CallHoldOperation.CONNECT_TWO_CALLS,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async def test_hold_call_without_call_index(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
|
||||||
|
operation: hfp.CallHoldOperation,
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
call_hold_future = asyncio.get_running_loop().create_future()
|
||||||
|
ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
|
||||||
|
|
||||||
|
await hf.execute_command(f"AT+CHLD={operation.value}")
|
||||||
|
|
||||||
|
assert (await call_hold_future) == (operation, None)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operation,",
|
||||||
|
(
|
||||||
|
hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
|
||||||
|
hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async def test_hold_call_with_call_index(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
|
||||||
|
operation: hfp.CallHoldOperation,
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
call_hold_future = asyncio.get_running_loop().create_future()
|
||||||
|
ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
|
||||||
|
ag.calls.append(
|
||||||
|
hfp.CallInfo(
|
||||||
|
index=1,
|
||||||
|
direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
|
||||||
|
status=hfp.CallInfoStatus.ACTIVE,
|
||||||
|
mode=hfp.CallInfoMode.VOICE,
|
||||||
|
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
|
||||||
|
number='123456789',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}")
|
||||||
|
|
||||||
|
assert (await call_hold_future) == (operation, 1)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
ring_future = asyncio.get_running_loop().create_future()
|
||||||
|
hf.on("ring", lambda: ring_future.set_result(None))
|
||||||
|
|
||||||
|
ag.send_ring()
|
||||||
|
|
||||||
|
await ring_future
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
speaker_volume_future = asyncio.get_running_loop().create_future()
|
||||||
|
hf.on("speaker_volume", speaker_volume_future.set_result)
|
||||||
|
|
||||||
|
ag.set_speaker_volume(10)
|
||||||
|
|
||||||
|
assert await speaker_volume_future == 10
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_microphone_volume(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
microphone_volume_future = asyncio.get_running_loop().create_future()
|
||||||
|
hf.on("microphone_volume", microphone_volume_future.set_result)
|
||||||
|
|
||||||
|
ag.set_microphone_volume(10)
|
||||||
|
|
||||||
|
assert await microphone_volume_future == 10
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
cli_notification_future = asyncio.get_running_loop().create_future()
|
||||||
|
hf.on("cli_notification", cli_notification_future.set_result)
|
||||||
|
|
||||||
|
ag.send_cli_notification(
|
||||||
|
hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"")
|
||||||
|
)
|
||||||
|
|
||||||
|
assert await cli_notification_future == hfp.CallLineIdentification(
|
||||||
|
number="123456789", type=129, alpha="Bumble", subaddr="", satype=None
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_voice_recognition_from_hf(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
voice_recognition_future = asyncio.get_running_loop().create_future()
|
||||||
|
ag.on("voice_recognition", voice_recognition_future.set_result)
|
||||||
|
|
||||||
|
await hf.execute_command("AT+BVRA=1")
|
||||||
|
|
||||||
|
assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_voice_recognition_from_ag(
|
||||||
|
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||||
|
):
|
||||||
|
hf, ag = hfp_connections
|
||||||
|
voice_recognition_future = asyncio.get_running_loop().create_future()
|
||||||
|
hf.on("voice_recognition", voice_recognition_future.set_result)
|
||||||
|
|
||||||
|
ag.send_response("+BVRA: 1")
|
||||||
|
|
||||||
|
assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_hf_sdp_record():
|
async def test_hf_sdp_record():
|
||||||
|
|||||||
Reference in New Issue
Block a user