HFP: Add example and fix AG errors

This commit is contained in:
Josh Wu
2024-05-02 19:28:42 +08:00
parent 26e6650038
commit ccff32102f
4 changed files with 589 additions and 61 deletions

View File

@@ -50,7 +50,7 @@ from bumble.core import (
ProtocolError,
BT_GENERIC_AUDIO_SERVICE,
BT_HANDSFREE_SERVICE,
BT_HEADSET_AUDIO_GATEWAY_SERVICE,
BT_HANDSFREE_AUDIO_GATEWAY_SERVICE,
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
@@ -1156,7 +1156,7 @@ class AgProtocol(pyee.EventEmitter):
active_codec: AudioCodec
hf_indicator: When HF update their indicators, notify the new state.
Args:
hf_indicator: HfIndicator
hf_indicator: HfIndicatorState
codec_connection_request: Emit when HF sends AT+BCC to request codec connection.
answer: Emit when HF sends ATA to answer phone call.
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
@@ -1168,7 +1168,12 @@ class AgProtocol(pyee.EventEmitter):
Args:
operation: CallHoldOperation
call_index: Optional[int]
speaker_volume: Emitted when AG update speaker volume autonomously.
Args:
volume: Int
microphone_volume: Emitted when AG update microphone volume autonomously.
Args:
volume: Int
"""
supported_hf_features: int
@@ -1191,6 +1196,7 @@ class AgProtocol(pyee.EventEmitter):
inband_ringtone_enabled: bool
cme_error_enabled: bool
cli_notification_enabled: bool
call_waiting_enabled: bool
_remained_slc_setup_features: Set[HfFeature]
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
@@ -1218,6 +1224,7 @@ class AgProtocol(pyee.EventEmitter):
self.indicator_report_enabled = False
self.cme_error_enabled = False
self.cli_notification_enabled = False
self.call_waiting_enabled = False
self.hf_indicators = collections.OrderedDict()
@@ -1465,7 +1472,12 @@ class AgProtocol(pyee.EventEmitter):
display: Optional[bytes] = None,
indicator: bytes = b'',
) -> None:
if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1):
if (
int(mode) != 3
or (keypad and int(keypad))
or (display and int(display))
or int(indicator) not in (0, 1)
):
logger.error(
f'Unexpected values: mode={mode!r}, keypad={keypad!r}, '
f'display={display!r}, indicator={indicator!r}'
@@ -1479,6 +1491,10 @@ class AgProtocol(pyee.EventEmitter):
self.cme_error_enabled = bool(int(enabled))
self.send_ok()
def _on_ccwa(self, enabled: bytes) -> None:
self.call_waiting_enabled = bool(int(enabled))
self.send_ok()
def _on_bind(self, *args) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
@@ -1578,6 +1594,15 @@ class AgProtocol(pyee.EventEmitter):
if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY):
logger.error('Remote doesn not support CLI but sends AT+CLIP')
self.cli_notification_enabled = True if enabled == b'1' else False
self.send_ok()
def _on_vgs(self, level: bytes) -> None:
self.emit('speaker_volume', int(level))
self.send_ok()
def _on_vgm(self, level: bytes) -> None:
self.emit('microphone_volume', int(level))
self.send_ok()
# -----------------------------------------------------------------------------
@@ -1761,7 +1786,7 @@ def make_ag_sdp_records(
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
]
),
@@ -1788,7 +1813,7 @@ def make_ag_sdp_records(
[
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.unsigned_integer_16(version),
]
)
@@ -1820,6 +1845,7 @@ async def find_hf_sdp_record(
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
],
)
for attribute_lists in search_result:
@@ -1839,10 +1865,17 @@ async def find_hf_sdp_record(
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
features = HfSdpFeature(attribute.value.value)
if not channel or not version or features is None:
logger.warning(f"Bad result {attribute_lists}.")
return None
return (channel, version, features)
elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
class_id_list = attribute.value.value
uuid = class_id_list[0].value
# AG record may also contain HF UUID in its profile descriptor list.
# If found, skip this record.
if uuid == BT_HANDSFREE_AUDIO_GATEWAY_SERVICE:
channel, version, features = (None, None, None)
break
if channel is not None and version is not None and features is not None:
return (channel, version, features)
return None
@@ -1859,7 +1892,7 @@ async def find_ag_sdp_record(
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE],
uuids=[BT_HANDSFREE_AUDIO_GATEWAY_SERVICE],
attribute_ids=[
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,