HFP AG implementation

This commit is contained in:
Josh Wu
2024-04-05 23:46:06 +08:00
committed by zxzxwu
parent c65188dcbf
commit deba181857
5 changed files with 1024 additions and 225 deletions

View File

@@ -15,6 +15,9 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import collections
import collections.abc import collections.abc
import logging import logging
import asyncio import asyncio
@@ -22,16 +25,32 @@ import dataclasses
import enum import enum
import traceback import traceback
import pyee import pyee
from typing import Dict, List, Union, Set, Any, Optional, Type, TYPE_CHECKING import re
from typing import (
Dict,
List,
Union,
Set,
Any,
Optional,
Type,
Tuple,
ClassVar,
Iterable,
TYPE_CHECKING,
)
from typing_extensions import Self from typing_extensions import Self
from bumble import at from bumble import at
from bumble import device
from bumble import rfcomm from bumble import rfcomm
from bumble import sdp
from bumble.colors import color from bumble.colors import color
from bumble.core import ( from bumble.core import (
ProtocolError, ProtocolError,
BT_GENERIC_AUDIO_SERVICE, BT_GENERIC_AUDIO_SERVICE,
BT_HANDSFREE_SERVICE, BT_HANDSFREE_SERVICE,
BT_HEADSET_AUDIO_GATEWAY_SERVICE,
BT_L2CAP_PROTOCOL_ID, BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID,
) )
@@ -40,15 +59,6 @@ from bumble.hci import (
CodingFormat, CodingFormat,
CodecID, CodecID,
) )
from bumble.sdp import (
DataElement,
ServiceAttribute,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -329,6 +339,21 @@ class CallInfo:
type: Optional[int] = None type: Optional[int] = None
class CmeError(enum.IntEnum):
"""
CME ERROR codes (partial listed).
TS 127 007 - V6.8.0, 9.2.1 General errors
"""
PHONE_FAILURE = 0
OPERATION_NOT_ALLOWED = 3
OPERATION_NOT_SUPPORTED = 4
MEMORY_FULL = 20
INVALID_INDEX = 21
NOT_FOUND = 22
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Hands-Free Control Interoperability Requirements # Hands-Free Control Interoperability Requirements
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -402,12 +427,21 @@ STATUS_CODES = [
@dataclasses.dataclass @dataclasses.dataclass
class Configuration: class HfConfiguration:
supported_hf_features: List[HfFeature] supported_hf_features: List[HfFeature]
supported_hf_indicators: List[HfIndicator] supported_hf_indicators: List[HfIndicator]
supported_audio_codecs: List[AudioCodec] supported_audio_codecs: List[AudioCodec]
@dataclasses.dataclass
class AgConfiguration:
supported_ag_features: Iterable[AgFeature]
supported_ag_indicators: collections.abc.Sequence[AgIndicatorState]
supported_hf_indicators: Iterable[HfIndicator]
supported_ag_call_hold_operations: Iterable[CallHoldOperation]
supported_audio_codecs: Iterable[AudioCodec]
class AtResponseType(enum.Enum): class AtResponseType(enum.Enum):
""" """
Indicates if a response is expected from an AT command, and if multiple responses are accepted. Indicates if a response is expected from an AT command, and if multiple responses are accepted.
@@ -435,18 +469,148 @@ class AtResponse:
) )
@dataclasses.dataclass
class AtCommand:
class SubCode(str, enum.Enum):
NONE = ''
SET = '='
TEST = '=?'
READ = '?'
code: str
sub_code: SubCode
parameters: list
_PARSE_PATTERN: ClassVar[re.Pattern] = re.compile(
r'AT\+(?P<code>[A-Z]+)(?P<sub_code>=\?|=|\?)?(?P<parameters>.*)'
)
@classmethod
def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())):
if buffer.startswith(b'ATA'):
return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[])
if buffer.startswith(b'ATD'):
return cls(
code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]]
)
raise HfpProtocolError('Invalid command')
parameters = []
if parameters_text := match.group('parameters'):
parameters = at.parse_parameters(parameters_text.encode())
return cls(
code=match.group('code'),
sub_code=AtCommand.SubCode(match.group('sub_code') or ''),
parameters=parameters,
)
@dataclasses.dataclass @dataclasses.dataclass
class AgIndicatorState: class AgIndicatorState:
description: str """State wrapper of AG indicator.
index: int
Attributes:
indicator: Indicator of this indicator state.
supported_values: Supported values of this indicator.
current_status: Current status of this indicator.
index: (HF only) Index of this indicator.
enabled: (AG only) Whether this indicator is enabled to report.
on_test_text: Text message reported in AT+CIND=? of this indicator.
"""
indicator: AgIndicator
supported_values: Set[int] supported_values: Set[int]
current_status: int current_status: int
index: Optional[int] = None
enabled: bool = True
@property
def on_test_text(self) -> str:
min_value = min(self.supported_values)
max_value = max(self.supported_values)
if len(self.supported_values) == (max_value - min_value + 1):
supported_values_text = f'({min_value}-{max_value})'
else:
supported_values_text = (
f'({",".join(str(v) for v in self.supported_values)})'
)
return f'(\"{self.indicator.value}\",{supported_values_text})'
@classmethod
def call(cls: Type[Self]) -> Self:
"""Default call indicator state."""
return cls(
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
)
@classmethod
def callsetup(cls: Type[Self]) -> Self:
"""Default callsetup indicator state."""
return cls(
indicator=AgIndicator.CALL_SETUP,
supported_values={0, 1, 2, 3},
current_status=0,
)
@classmethod
def callheld(cls: Type[Self]) -> Self:
"""Default call indicator state."""
return cls(
indicator=AgIndicator.CALL_HELD,
supported_values={0, 1, 2},
current_status=0,
)
@classmethod
def service(cls: Type[Self]) -> Self:
"""Default service indicator state."""
return cls(
indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0
)
@classmethod
def signal(cls: Type[Self]) -> Self:
"""Default signal indicator state."""
return cls(
indicator=AgIndicator.SIGNAL,
supported_values={0, 1, 2, 3, 4, 5},
current_status=0,
)
@classmethod
def roam(cls: Type[Self]) -> Self:
"""Default roam indicator state."""
return cls(
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
)
@classmethod
def battchg(cls: Type[Self]) -> Self:
"""Default battery charge indicator state."""
return cls(
indicator=AgIndicator.BATTERY_CHARGE,
supported_values={0, 1, 2, 3, 4, 5},
current_status=0,
)
@dataclasses.dataclass @dataclasses.dataclass
class HfIndicatorState: class HfIndicatorState:
"""State wrapper of HF indicator.
Attributes:
indicator: Indicator of this indicator state.
supported: Whether this indicator is supported.
enabled: Whether this indicator is enabled.
current_status: Current (last-reported) status value of this indicaotr.
"""
indicator: HfIndicator
supported: bool = False supported: bool = False
enabled: bool = False enabled: bool = False
current_status: int = 0
class HfProtocol(pyee.EventEmitter): class HfProtocol(pyee.EventEmitter):
@@ -464,6 +628,9 @@ class HfProtocol(pyee.EventEmitter):
ag_indicator: AgIndicator ag_indicator: AgIndicator
""" """
class HfLoopTermination(HfpProtocolError): ...
"""Termination signal for run() loop."""
supported_hf_features: int supported_hf_features: int
supported_audio_codecs: List[AudioCodec] supported_audio_codecs: List[AudioCodec]
@@ -477,14 +644,14 @@ class HfProtocol(pyee.EventEmitter):
command_lock: asyncio.Lock command_lock: asyncio.Lock
if TYPE_CHECKING: if TYPE_CHECKING:
response_queue: asyncio.Queue[AtResponse] response_queue: asyncio.Queue[AtResponse]
unsolicited_queue: asyncio.Queue[AtResponse] unsolicited_queue: asyncio.Queue[Optional[AtResponse]]
else: else:
response_queue: asyncio.Queue response_queue: asyncio.Queue
unsolicited_queue: asyncio.Queue unsolicited_queue: asyncio.Queue
read_buffer: bytearray read_buffer: bytearray
active_codec: AudioCodec active_codec: AudioCodec
def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None:
super().__init__() super().__init__()
# Configure internal state. # Configure internal state.
@@ -494,13 +661,14 @@ class HfProtocol(pyee.EventEmitter):
self.unsolicited_queue = asyncio.Queue() self.unsolicited_queue = asyncio.Queue()
self.read_buffer = bytearray() self.read_buffer = bytearray()
self.active_codec = AudioCodec.CVSD self.active_codec = AudioCodec.CVSD
self._slc_initialized = False
# Build local features. # Build local features.
self.supported_hf_features = sum(configuration.supported_hf_features) self.supported_hf_features = sum(configuration.supported_hf_features)
self.supported_audio_codecs = configuration.supported_audio_codecs self.supported_audio_codecs = configuration.supported_audio_codecs
self.hf_indicators = { self.hf_indicators = {
indicator: HfIndicatorState() indicator: HfIndicatorState(indicator=indicator)
for indicator in configuration.supported_hf_indicators for indicator in configuration.supported_hf_indicators
} }
@@ -511,6 +679,10 @@ class HfProtocol(pyee.EventEmitter):
# Bind the AT reader to the RFCOMM channel. # Bind the AT reader to the RFCOMM channel.
self.dlc.sink = self._read_at self.dlc.sink = self._read_at
# Stop the run() loop when L2CAP is closed.
self.dlc.multiplexer.l2cap_channel.on(
'close', lambda: self.unsolicited_queue.put_nowait(None)
)
def supports_hf_feature(self, feature: HfFeature) -> bool: def supports_hf_feature(self, feature: HfFeature) -> bool:
return (self.supported_hf_features & feature) != 0 return (self.supported_hf_features & feature) != 0
@@ -621,7 +793,7 @@ class HfProtocol(pyee.EventEmitter):
# If both the HF and AG do support the Codec Negotiation feature # If both the HF and AG do support the Codec Negotiation feature
# then the HF shall send the AT+BAC=<HF available codecs> command to # then the HF shall send the AT+BAC=<HF available codecs> command to
# the AG to notify the AG of the available codecs in the HF. # the AG to notify the AG of the available codecs in the HF.
codecs = [str(c) for c in self.supported_audio_codecs] codecs = [str(c.value) for c in self.supported_audio_codecs]
await self.execute_command(f"AT+BAC={','.join(codecs)}") await self.execute_command(f"AT+BAC={','.join(codecs)}")
# 4.2.1.3 AG Indicators # 4.2.1.3 AG Indicators
@@ -639,7 +811,7 @@ class HfProtocol(pyee.EventEmitter):
self.ag_indicators = [] self.ag_indicators = []
for index, indicator in enumerate(response.parameters): for index, indicator in enumerate(response.parameters):
description = indicator[0].decode() description = AgIndicator(indicator[0].decode())
supported_values = [] supported_values = []
for value in indicator[1]: for value in indicator[1]:
value = value.split(b'-') value = value.split(b'-')
@@ -697,7 +869,7 @@ class HfProtocol(pyee.EventEmitter):
# shall send the AT+BIND=<HF supported HF indicators> command to the AG # shall send the AT+BIND=<HF supported HF indicators> command to the AG
# to notify the AG of the supported indicators assigned numbers in the # to notify the AG of the supported indicators assigned numbers in the
# HF. The AG shall respond with OK # HF. The AG shall respond with OK
indicators = [str(i) for i in self.hf_indicators.keys()] indicators = [str(i.value) for i in self.hf_indicators]
await self.execute_command(f"AT+BIND={','.join(indicators)}") await self.execute_command(f"AT+BIND={','.join(indicators)}")
# After having provided the AG with the HF indicators it supports, # After having provided the AG with the HF indicators it supports,
@@ -733,6 +905,7 @@ class HfProtocol(pyee.EventEmitter):
self.hf_indicators[indicator].enabled = True self.hf_indicators[indicator].enabled = True
logger.info("SLC setup completed") logger.info("SLC setup completed")
self._slc_initialized = True
async def setup_audio_connection(self): async def setup_audio_connection(self):
"""4.11.2 Audio Connection Setup by HF.""" """4.11.2 Audio Connection Setup by HF."""
@@ -824,11 +997,13 @@ class HfProtocol(pyee.EventEmitter):
ag_indicator = self.ag_indicators[index - 1] ag_indicator = self.ag_indicators[index - 1]
ag_indicator.current_status = value ag_indicator.current_status = value
self.emit('ag_indicator', ag_indicator) self.emit('ag_indicator', ag_indicator)
logger.info(f"AG indicator updated: {ag_indicator.description}, {value}") logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}")
async def handle_unsolicited(self): async def handle_unsolicited(self):
"""Handle unsolicited result codes sent by the audio gateway.""" """Handle unsolicited result codes sent by the audio gateway."""
result = await self.unsolicited_queue.get() result = await self.unsolicited_queue.get()
if not result:
raise HfProtocol.HfLoopTermination()
if result.code == "+BCS": if result.code == "+BCS":
await self.setup_codec_connection(int(result.parameters[0])) await self.setup_codec_connection(int(result.parameters[0]))
elif result.code == "+CIEV": elif result.code == "+CIEV":
@@ -846,14 +1021,350 @@ class HfProtocol(pyee.EventEmitter):
""" """
try: try:
await self.initiate_slc() if not self._slc_initialized:
await self.initiate_slc()
while True: while True:
await self.handle_unsolicited() await self.handle_unsolicited()
except HfProtocol.HfLoopTermination:
logger.info('Loop terminated')
except Exception: except Exception:
logger.error("HFP-HF protocol failed with the following error:") logger.error("HFP-HF protocol failed with the following error:")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
class AgProtocol(pyee.EventEmitter):
"""
Implementation for the Audio-Gateway side of the Hands-Free profile.
Reference specification Hands-Free Profile v1.8.
Emitted events:
slc_complete: Emit when SLC procedure is completed.
codec_negotiation: When codec is renegotiated, notify the new codec.
Args:
active_codec: AudioCodec
hf_indicator: When HF update their indicators, notify the new state.
Args:
hf_indicator: HfIndicator
codec_connection_request: Emit when HF sends AT+BCC to request codec connection.
answer: Emit when HF sends ATA to answer phone call.
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
dial: Emit when HF sends ATD to dial phone call.
"""
supported_hf_features: int
supported_hf_indicators: Set[HfIndicator]
supported_audio_codecs: List[AudioCodec]
supported_ag_features: int
supported_ag_call_hold_operations: List[CallHoldOperation]
ag_indicators: List[AgIndicatorState]
hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState]
dlc: rfcomm.DLC
read_buffer: bytearray
active_codec: AudioCodec
indicator_report_enabled: bool
inband_ringtone_enabled: bool
cme_error_enabled: bool
_remained_slc_setup_features: Set[HfFeature]
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
super().__init__()
# Configure internal state.
self.dlc = dlc
self.read_buffer = bytearray()
self.active_codec = AudioCodec.CVSD
# Build local features.
self.supported_ag_features = sum(configuration.supported_ag_features)
self.supported_ag_call_hold_operations = list(
configuration.supported_ag_call_hold_operations
)
self.ag_indicators = list(configuration.supported_ag_indicators)
self.supported_hf_indicators = set(configuration.supported_hf_indicators)
self.inband_ringtone_enabled = True
self._remained_slc_setup_features = set()
# Clear remote features.
self.supported_hf_features = 0
self.supported_audio_codecs = []
self.indicator_report_enabled = False
self.cme_error_enabled = False
self.hf_indicators = collections.OrderedDict()
# Bind the AT reader to the RFCOMM channel.
self.dlc.sink = self._read_at
def supports_hf_feature(self, feature: HfFeature) -> bool:
return (self.supported_hf_features & feature) != 0
def supports_ag_feature(self, feature: AgFeature) -> bool:
return (self.supported_ag_features & feature) != 0
def _read_at(self, data: bytes):
"""
Reads AT messages from the RFCOMM channel.
"""
# Append to the read buffer.
self.read_buffer.extend(data)
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
def send_response(self, response: str) -> None:
"""Sends an AT response."""
self.dlc.write(f'\r\n{response}\r\n')
def send_cme_error(self, error_code: CmeError) -> None:
"""Sends an CME ERROR response.
If CME Error is not enabled by HF, sends ERROR instead.
"""
if self.cme_error_enabled:
self.send_response(f'+CME ERROR: {error_code.value}')
else:
self.send_error()
def send_ok(self) -> None:
"""Sends an OK response."""
self.send_response('OK')
def send_error(self) -> None:
"""Sends an ERROR response."""
self.send_response('ERROR')
def set_inband_ringtone_enabled(self, enabled: bool) -> None:
"""Enables or disables in-band ringtone."""
self.inband_ringtone_enabled = enabled
self.send_response(f'+BSIR: {1 if enabled else 0}')
def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
"""Updates AG indicator.
Args:
indicator: Name of the indicator.
value: new value of the indicator.
"""
search_result = next(
(
(index, state)
for index, state in enumerate(self.ag_indicators)
if state.indicator == indicator
),
None,
)
if not search_result:
raise KeyError(f'{indicator} is not supported.')
index, indicator_state = search_result
if not self.indicator_report_enabled:
logger.warning('AG indicator report is disabled')
if not indicator_state.enabled:
logger.warning(f'AG indicator {indicator} is disabled')
indicator_state.current_status = value
self.send_response(f'+CIEV: {index+1},{value}')
async def negotiate_codec(self, codec: AudioCodec) -> None:
"""Starts codec negotiation."""
if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
logger.warning('Local does not support Codec Negotiation')
if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION):
logger.warning('Peer does not support Codec Negotiation')
if codec not in self.supported_audio_codecs:
logger.warning(f'{codec} is not supported by peer')
at_bcs_future = asyncio.get_running_loop().create_future()
self.once('codec_negotiation', at_bcs_future.set_result)
self.send_response(f'+BCS: {codec.value}')
if (new_codec := await at_bcs_future) != codec:
raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
def _check_remained_slc_commands(self) -> None:
if not self._remained_slc_setup_features:
self.emit('slc_complete')
def _on_brsf(self, hf_features: bytes) -> None:
self.supported_hf_features = int(hf_features)
self.send_response(f'+BRSF: {self.supported_ag_features}')
self.send_ok()
if self.supports_hf_feature(
HfFeature.HF_INDICATORS
) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS)
if self.supports_hf_feature(
HfFeature.THREE_WAY_CALLING
) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING)
def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.send_ok()
def _on_bcs(self, codec: bytes) -> None:
self.active_codec = AudioCodec(int(codec))
self.send_ok()
self.emit('codec_negotiation', self.active_codec)
def _on_cind_test(self) -> None:
if not self.ag_indicators:
self.send_cme_error(CmeError.NOT_FOUND)
return
indicator_list_str = ",".join(
indicator.on_test_text for indicator in self.ag_indicators
)
self.send_response(f'+CIND: {indicator_list_str}')
self.send_ok()
def _on_cind_read(self) -> None:
if not self.ag_indicators:
self.send_cme_error(CmeError.NOT_FOUND)
return
indicator_list_str = ",".join(
str(indicator.current_status) for indicator in self.ag_indicators
)
self.send_response(f'+CIND: {indicator_list_str}')
self.send_ok()
self._check_remained_slc_commands()
def _on_cmer(
self,
mode: bytes,
keypad: Optional[bytes] = None,
display: Optional[bytes] = None,
indicator: bytes = b'',
) -> None:
if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1):
logger.error(
f'Unexpected values: mode={mode!r}, keypad={keypad!r}, '
f'display={display!r}, indicator={indicator!r}'
)
self.send_cme_error(CmeError.INVALID_INDEX)
self.indicator_report_enabled = bool(int(indicator))
self.send_ok()
def _on_cmee(self, enabled: bytes) -> None:
self.cme_error_enabled = bool(int(enabled))
self.send_ok()
def _on_bind(self, *args) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
peer_supported_indicators = set(
HfIndicator(int(indicator)) for indicator in args
)
self.hf_indicators = collections.OrderedDict(
{
indicator: HfIndicatorState(indicator=indicator)
for indicator in self.supported_hf_indicators.intersection(
peer_supported_indicators
)
}
)
self.send_ok()
def _on_bind_test(self) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
hf_indicator_list_str = ",".join(
str(indicator.value) for indicator in self.supported_hf_indicators
)
self.send_response(f'+BIND: ({hf_indicator_list_str})')
self.send_ok()
def _on_bind_read(self) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
for indicator in self.hf_indicators:
self.send_response(f'+BIND: {indicator.value},1')
self.send_ok()
self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS)
self._check_remained_slc_commands()
def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
index = HfIndicator(int(index_bytes))
if index not in self.hf_indicators:
self.send_error()
return
self.hf_indicators[index].current_status = int(value_bytes)
self.emit('hf_indicator', self.hf_indicators[index])
self.send_ok()
def _on_bia(self, *args) -> None:
for enabled, state in zip(args, self.ag_indicators):
state.enabled = bool(int(enabled))
self.send_ok()
def _on_bcc(self) -> None:
self.emit('codec_connection_request')
self.send_ok()
def _on_a(self) -> None:
"""ATA handler."""
self.emit('answer')
self.send_ok()
def _on_d(self, number: bytes) -> None:
"""ATD handler."""
self.emit('dial', number.decode())
self.send_ok()
def _on_chup(self) -> None:
self.emit('hang_up')
self.send_ok()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Normative SDP definitions # Normative SDP definitions
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -907,9 +1418,12 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_TEST = 0x80 VOICE_RECOGNITION_TEST = 0x80
def sdp_records( def make_hf_sdp_records(
service_record_handle: int, rfcomm_channel: int, configuration: Configuration service_record_handle: int,
) -> List[ServiceAttribute]: rfcomm_channel: int,
configuration: HfConfiguration,
version: ProfileVersion = ProfileVersion.V1_8,
) -> List[sdp.ServiceAttribute]:
""" """
Generates the SDP record for HFP Hands-Free support. Generates the SDP record for HFP Hands-Free support.
@@ -941,53 +1455,226 @@ def sdp_records(
hf_supported_features |= HfSdpFeature.WIDE_BAND hf_supported_features |= HfSdpFeature.WIDE_BAND
return [ return [
ServiceAttribute( sdp.ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(service_record_handle), sdp.DataElement.unsigned_integer_32(service_record_handle),
), ),
ServiceAttribute( sdp.ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence( sdp.DataElement.sequence(
[ [
DataElement.uuid(BT_HANDSFREE_SERVICE), sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
] ]
), ),
), ),
ServiceAttribute( sdp.ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence( sdp.DataElement.sequence(
[ [
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), sdp.DataElement.sequence(
DataElement.sequence( [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
),
sdp.DataElement.sequence(
[ [
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
DataElement.unsigned_integer_8(rfcomm_channel), sdp.DataElement.unsigned_integer_8(rfcomm_channel),
] ]
), ),
] ]
), ),
), ),
ServiceAttribute( sdp.ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence( sdp.DataElement.sequence(
[ [
DataElement.sequence( sdp.DataElement.sequence(
[ [
DataElement.uuid(BT_HANDSFREE_SERVICE), sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
DataElement.unsigned_integer_16(ProfileVersion.V1_8), sdp.DataElement.unsigned_integer_16(version),
] ]
) )
] ]
), ),
), ),
ServiceAttribute( sdp.ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(hf_supported_features), sdp.DataElement.unsigned_integer_16(hf_supported_features),
), ),
] ]
def make_ag_sdp_records(
service_record_handle: int,
rfcomm_channel: int,
configuration: AgConfiguration,
version: ProfileVersion = ProfileVersion.V1_8,
) -> List[sdp.ServiceAttribute]:
"""
Generates the SDP record for HFP Audio-Gateway support.
The record exposes the features supported in the input configuration,
and the allocated RFCOMM channel.
"""
ag_supported_features = 0
if AgFeature.EC_NR in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.EC_NR
if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING
if (
AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS
in configuration.supported_ag_features
):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND
return [
sdp.ServiceAttribute(
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_32(service_record_handle),
),
sdp.ServiceAttribute(
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
]
),
),
sdp.ServiceAttribute(
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.sequence(
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
),
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
sdp.DataElement.unsigned_integer_8(rfcomm_channel),
]
),
]
),
),
sdp.ServiceAttribute(
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.unsigned_integer_16(version),
]
)
]
),
),
sdp.ServiceAttribute(
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_16(ag_supported_features),
),
]
async def find_hf_sdp_record(
connection: device.Connection,
) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]:
"""Searches a Hands-Free SDP record from remote device.
Args:
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[BT_HANDSFREE_SERVICE],
attribute_ids=[
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
for attribute_lists in search_result:
channel: Optional[int] = None
version: Optional[ProfileVersion] = None
features: Optional[HfSdpFeature] = None
for attribute in attribute_lists:
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
protocol_descriptor_list = attribute.value.value
channel = protocol_descriptor_list[1].value[1].value
elif (
attribute.id
== sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
profile_descriptor_list = attribute.value.value
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
features = HfSdpFeature(attribute.value.value)
if not channel or not version or features is None:
logger.warning(f"Bad result {attribute_lists}.")
return None
return (channel, version, features)
return None
async def find_ag_sdp_record(
connection: device.Connection,
) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]:
"""Searches an Audio-Gateway SDP record from remote device.
Args:
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE],
attribute_ids=[
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
for attribute_lists in search_result:
channel: Optional[int] = None
version: Optional[ProfileVersion] = None
features: Optional[AgSdpFeature] = None
for attribute in attribute_lists:
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
protocol_descriptor_list = attribute.value.value
channel = protocol_descriptor_list[1].value[1].value
elif (
attribute.id
== sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
profile_descriptor_list = attribute.value.value
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
features = AgSdpFeature(attribute.value.value)
if not channel or not version or features is None:
logger.warning(f"Bad result {attribute_lists}.")
return None
return (channel, version, features)
return None
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ESCO Codec Default Parameters # ESCO Codec Default Parameters
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -20,123 +20,48 @@ import sys
import os import os
import logging import logging
from bumble.colors import color
import bumble.core import bumble.core
from bumble.device import Device from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.core import ( from bumble.core import (
BT_HANDSFREE_SERVICE,
BT_RFCOMM_PROTOCOL_ID,
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
) )
from bumble import rfcomm, hfp from bumble import rfcomm, hfp
from bumble.hci import HCI_SynchronousDataPacket from bumble.hci import HCI_SynchronousDataPacket
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- def _default_configuration() -> hfp.AgConfiguration:
# pylint: disable-next=too-many-nested-blocks return hfp.AgConfiguration(
async def list_rfcomm_channels(device, connection): supported_ag_features=[
# Connect to the SDP Server hfp.AgFeature.HF_INDICATORS,
sdp_client = SDP_Client(connection) hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
await sdp_client.connect() hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
# Search for services that support the Handsfree Profile hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
search_result = await sdp_client.search_attributes(
[BT_HANDSFREE_SERVICE],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
], ],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
) )
print(color('==================================', 'blue'))
print(color('Handsfree Services:', 'yellow'))
rfcomm_channels = []
# pylint: disable-next=too-many-nested-blocks
for attribute_list in search_result:
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
print(color('SERVICE:', 'green'))
print(
color(' RFCOMM Channel:', 'cyan'),
protocol_descriptor.value[1].value,
)
rfcomm_channels.append(protocol_descriptor.value[1].value)
# List profiles
bluetooth_profile_descriptor_list = (
ServiceAttribute.find_attribute_in_list(
attribute_list,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
)
if bluetooth_profile_descriptor_list:
if bluetooth_profile_descriptor_list.value:
if (
bluetooth_profile_descriptor_list.value[0].type
== DataElement.SEQUENCE
):
bluetooth_profile_descriptors = (
bluetooth_profile_descriptor_list.value
)
else:
# Sometimes, instead of a list of lists, we just
# find a list. Fix that
bluetooth_profile_descriptors = [
bluetooth_profile_descriptor_list
]
print(color(' Profiles:', 'green'))
for (
bluetooth_profile_descriptor
) in bluetooth_profile_descriptors:
version_major = (
bluetooth_profile_descriptor.value[1].value >> 8
)
version_minor = (
bluetooth_profile_descriptor.value[1].value
& 0xFF
)
print(
' '
f'{bluetooth_profile_descriptor.value[0].value}'
f' - version {version_major}.{version_minor}'
)
# List service classes
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
print(color(' Service Classes:', 'green'))
for service_class_id in service_class_id_list.value:
print(' ', service_class_id.value)
await sdp_client.disconnect()
return rfcomm_channels
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print( print(
'Usage: run_hfp_gateway.py <device-config> <transport-spec> ' 'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
@@ -149,11 +74,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
await device.power_on() await device.power_on()
@@ -164,13 +91,14 @@ async def main():
print(f'=== Connected to {connection.peer_address}!') print(f'=== Connected to {connection.peer_address}!')
# Get a list of all the Handsfree services (should only be 1) # Get a list of all the Handsfree services (should only be 1)
channels = await list_rfcomm_channels(device, connection) if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
if len(channels) == 0:
print('!!! no service found') print('!!! no service found')
return return
# Pick the first one # Pick the first one
channel = channels[0] channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}')
# Request authentication # Request authentication
print('*** Authenticating...') print('*** Authenticating...')
@@ -205,51 +133,9 @@ async def main():
device.host.on('sco_packet', on_sco) device.host.on('sco_packet', on_sco)
# Protocol loop (just for testing at this point) ag_protocol = hfp.AgProtocol(session, _default_configuration())
protocol = hfp.HfpProtocol(session)
while True:
line = await protocol.next_line()
if line.startswith('AT+BRSF='): await hci_transport.source.terminated
protocol.send_response_line('+BRSF: 30')
protocol.send_response_line('OK')
elif line.startswith('AT+CIND=?'):
protocol.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
)
protocol.send_response_line('OK')
elif line.startswith('AT+CIND?'):
protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
protocol.send_response_line('OK')
elif line.startswith('AT+CMER='):
protocol.send_response_line('OK')
elif line.startswith('AT+CHLD=?'):
protocol.send_response_line('+CHLD: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+BTRH?'):
protocol.send_response_line('+BTRH: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+CLIP='):
protocol.send_response_line('OK')
elif line.startswith('AT+VGS='):
protocol.send_response_line('OK')
elif line.startswith('AT+BIA='):
protocol.send_response_line('OK')
elif line.startswith('AT+BVRA='):
protocol.send_response_line(
'+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
)
elif line.startswith('AT+XEVENT='):
protocol.send_response_line('OK')
elif line.startswith('AT+XAPL='):
protocol.send_response_line('OK')
else:
print(color('UNSUPPORTED AT COMMAND', 'red'))
protocol.send_response_line('ERROR')
await hci_source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -37,7 +37,7 @@ hf_protocol: Optional[HfProtocol] = None
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration): def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
print('*** DLC connected', dlc) print('*** DLC connected', dlc)
global hf_protocol global hf_protocol
hf_protocol = HfProtocol(dlc, configuration) hf_protocol = HfProtocol(dlc, configuration)
@@ -96,7 +96,7 @@ async def main():
# Hands-Free profile configuration. # Hands-Free profile configuration.
# TODO: load configuration from file. # TODO: load configuration from file.
configuration = hfp.Configuration( configuration = hfp.HfConfiguration(
supported_hf_features=[ supported_hf_features=[
hfp.HfFeature.THREE_WAY_CALLING, hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.REMOTE_VOLUME_CONTROL, hfp.HfFeature.REMOTE_VOLUME_CONTROL,

View File

@@ -83,7 +83,7 @@ build =
build >= 0.7 build >= 0.7
test = test =
pytest >= 8.0 pytest >= 8.0
pytest-asyncio >= 0.21.1 pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0 pytest-html >= 3.2.0
coverage >= 6.4 coverage >= 6.4
development = development =

View File

@@ -19,8 +19,9 @@ import asyncio
import logging import logging
import os import os
import pytest import pytest
import pytest_asyncio
from typing import Tuple from typing import Tuple, Optional
from .test_utils import TwoDevices from .test_utils import TwoDevices
from bumble import core from bumble import core
@@ -35,10 +36,73 @@ from bumble import hci
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def _default_hf_configuration() -> hfp.HfConfiguration:
return hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.CODEC_NEGOTIATION,
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
hfp.HfFeature.HF_INDICATORS,
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_audio_codecs=[
hfp.AudioCodec.CVSD,
hfp.AudioCodec.MSBC,
],
)
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return hfp.HfSdpFeature.WIDE_BAND
# -----------------------------------------------------------------------------
def _default_ag_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def make_hfp_connections( async def make_hfp_connections(
hf_config: hfp.Configuration, hf_config: Optional[hfp.HfConfiguration] = None,
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]: ag_config: Optional[hfp.AgConfiguration] = None,
):
if not hf_config:
hf_config = _default_hf_configuration()
if not ag_config:
ag_config = _default_ag_configuration()
# Setup devices # Setup devices
devices = TwoDevices() devices = TwoDevices()
await devices.setup_connection() await devices.setup_connection()
@@ -55,38 +119,200 @@ async def make_hfp_connections(
# Setup HFP connection # Setup HFP connection
hf = hfp.HfProtocol(client_dlc, hf_config) hf = hfp.HfProtocol(client_dlc, hf_config)
ag = hfp.HfpProtocol(server_dlc) ag = hfp.AgProtocol(server_dlc, ag_config)
return hf, ag
await hf.initiate_slc()
return (hf, ag)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest_asyncio.fixture
async def hfp_connections():
hf, ag = await make_hfp_connections()
hf_loop_task = asyncio.create_task(hf.run())
try:
yield (hf, ag)
finally:
# Close the coroutine.
hf.unsolicited_queue.put_nowait(None)
await hf_loop_task
# -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_slc(): async def test_slc_with_minimal_features():
hf_config = hfp.Configuration( hf, ag = await make_hfp_connections(
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[] hfp.HfConfiguration(
) supported_audio_codecs=[],
hf, ag = await make_hfp_connections(hf_config) supported_hf_features=[],
supported_hf_indicators=[],
async def ag_loop(): ),
while line := await ag.next_line(): hfp.AgConfiguration(
if line.startswith('AT+BRSF'): supported_ag_call_hold_operations=[],
ag.send_response_line('+BRSF: 0') supported_ag_features=[],
elif line.startswith('AT+CIND=?'): supported_ag_indicators=[
ag.send_response_line( hfp.AgIndicatorState(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' indicator=hfp.AgIndicator.CALL,
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' supported_values={0, 1},
'("callheld",(0-2))' current_status=0,
) )
elif line.startswith('AT+CIND?'): ],
ag.send_response_line('+CIND: 0,0,1,4,1,5,0') supported_hf_indicators=[],
ag.send_response_line('OK') supported_audio_codecs=[],
),
)
ag_task = asyncio.create_task(ag_loop()) assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
await hf.initiate_slc()
ag_task.cancel() # -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
hf.on('ag_indicator', future.set_result)
ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
indicator: hfp.AgIndicatorState = await future
assert indicator.current_status == 1
assert indicator.indicator == hfp.AgIndicator.CALL
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hf_indicator', future.set_result)
await hf.execute_command('AT+BIEV=2,100')
indicator: hfp.HfIndicatorState = await future
assert indicator.current_status == 100
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
hf.on('codec_negotiation', futures[0].set_result)
ag.on('codec_negotiation', futures[1].set_result)
await ag.negotiate_codec(hfp.AudioCodec.MSBC)
assert await futures[0] == await futures[1]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
NUMBER = 'ATD123456789'
future = asyncio.get_running_loop().create_future()
ag.on('dial', future.set_result)
await hf.execute_command(f'ATD{NUMBER}')
number: str = await future
assert number == NUMBER
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: future.set_result(None))
await hf.answer_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.reject_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.terminate_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_hf_sdp_features(),
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_ag_sdp_features(),
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------