diff --git a/tests/hfp_test.py b/tests/hfp_test.py new file mode 100644 index 00000000..bc6ccaf9 --- /dev/null +++ b/tests/hfp_test.py @@ -0,0 +1,100 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import os +import pytest + +from typing import Tuple + +from .test_utils import TwoDevices +from bumble import hfp +from bumble import rfcomm + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +async def make_hfp_connections( + hf_config: hfp.Configuration, +) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]: + # Setup devices + devices = TwoDevices() + await devices.setup_connection() + + # Setup RFCOMM channel + wait_dlc = asyncio.get_running_loop().create_future() + rfcomm_channel = rfcomm.Server(devices.devices[0]).listen( + lambda dlc: wait_dlc.set_result(dlc) + ) + assert devices.connections[0] + assert devices.connections[1] + client_mux = await rfcomm.Client(devices.devices[1], devices.connections[1]).start() + + client_dlc = await client_mux.open_dlc(rfcomm_channel) + server_dlc = await wait_dlc + + # Setup HFP connnection + hf = hfp.HfProtocol(client_dlc, hf_config) + ag = hfp.HfpProtocol(server_dlc) + return hf, ag + + +# ----------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_slc(): + hf_config = hfp.Configuration( + supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[] + ) + hf, ag = await make_hfp_connections(hf_config) + + async def ag_loop(): + while line := await ag.next_line(): + if line.startswith('AT+BRSF'): + ag.send_response_line('+BRSF: 0') + elif line.startswith('AT+CIND=?'): + ag.send_response_line( + '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' + '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' + '("callheld",(0-2))' + ) + elif line.startswith('AT+CIND?'): + ag.send_response_line('+CIND: 0,0,1,4,1,5,0') + ag.send_response_line('OK') + + ag_task = asyncio.create_task(ag_loop()) + + await hf.initiate_slc() + ag_task.cancel() + + +# ----------------------------------------------------------------------------- +async def run(): + await test_slc() + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(run())