# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import pytest from typing import Tuple from .test_utils import TwoDevices from bumble import core from bumble import device from bumble import hfp from bumble import rfcomm from bumble import hci # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- async def make_hfp_connections( hf_config: hfp.Configuration, ) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]: # Setup devices devices = TwoDevices() await devices.setup_connection() # Setup RFCOMM channel wait_dlc = asyncio.get_running_loop().create_future() rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(wait_dlc.set_result) assert devices.connections[0] assert devices.connections[1] client_mux = await rfcomm.Client(devices.connections[1]).start() client_dlc = await client_mux.open_dlc(rfcomm_channel) server_dlc = await wait_dlc # Setup HFP connection hf = hfp.HfProtocol(client_dlc, hf_config) ag = hfp.HfpProtocol(server_dlc) return hf, ag # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_slc(): hf_config = hfp.Configuration( supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[] ) hf, ag = await make_hfp_connections(hf_config) async def ag_loop(): while line := await ag.next_line(): if line.startswith('AT+BRSF'): ag.send_response_line('+BRSF: 0') elif line.startswith('AT+CIND=?'): ag.send_response_line( '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' '("callheld",(0-2))' ) elif line.startswith('AT+CIND?'): ag.send_response_line('+CIND: 0,0,1,4,1,5,0') ag.send_response_line('OK') ag_task = asyncio.create_task(ag_loop()) await hf.initiate_slc() ag_task.cancel() # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_sco_setup(): devices = TwoDevices() # Enable Classic connections devices[0].classic_enabled = True devices[1].classic_enabled = True # Start await devices[0].power_on() await devices[1].power_on() connections = await asyncio.gather( devices[0].connect( devices[1].public_address, transport=core.BT_BR_EDR_TRANSPORT ), devices[1].accept(devices[0].public_address), ) def on_sco_request(_connection: device.Connection, _link_type: int): connections[1].abort_on( 'disconnection', devices[1].send_command( hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command( bd_addr=connections[1].peer_address, **hfp.ESCO_PARAMETERS[ hfp.DefaultCodecParameters.ESCO_CVSD_S1 ].asdict(), ) ), ) devices[1].on('sco_request', on_sco_request) sco_connections = [ asyncio.get_running_loop().create_future(), asyncio.get_running_loop().create_future(), ] devices[0].on( 'sco_connection', lambda sco_link: sco_connections[0].set_result(sco_link) ) devices[1].on( 'sco_connection', lambda sco_link: sco_connections[1].set_result(sco_link) ) await devices[0].send_command( hci.HCI_Enhanced_Setup_Synchronous_Connection_Command( connection_handle=connections[0].handle, **hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(), ) ) await asyncio.gather(*sco_connections) # ----------------------------------------------------------------------------- async def run(): await test_slc() # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) asyncio.run(run())