mirror of
https://github.com/google/bumble.git
synced 2026-04-17 00:35:31 +00:00
101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
# Copyright 2021-2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import pytest
|
|
|
|
from typing import Tuple
|
|
|
|
from .test_utils import TwoDevices
|
|
from bumble import hfp
|
|
from bumble import rfcomm
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Logging
|
|
# -----------------------------------------------------------------------------
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def make_hfp_connections(
|
|
hf_config: hfp.Configuration,
|
|
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
|
|
# Setup devices
|
|
devices = TwoDevices()
|
|
await devices.setup_connection()
|
|
|
|
# Setup RFCOMM channel
|
|
wait_dlc = asyncio.get_running_loop().create_future()
|
|
rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(
|
|
lambda dlc: wait_dlc.set_result(dlc)
|
|
)
|
|
assert devices.connections[0]
|
|
assert devices.connections[1]
|
|
client_mux = await rfcomm.Client(devices.devices[1], devices.connections[1]).start()
|
|
|
|
client_dlc = await client_mux.open_dlc(rfcomm_channel)
|
|
server_dlc = await wait_dlc
|
|
|
|
# Setup HFP connection
|
|
hf = hfp.HfProtocol(client_dlc, hf_config)
|
|
ag = hfp.HfpProtocol(server_dlc)
|
|
return hf, ag
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_slc():
|
|
hf_config = hfp.Configuration(
|
|
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
|
|
)
|
|
hf, ag = await make_hfp_connections(hf_config)
|
|
|
|
async def ag_loop():
|
|
while line := await ag.next_line():
|
|
if line.startswith('AT+BRSF'):
|
|
ag.send_response_line('+BRSF: 0')
|
|
elif line.startswith('AT+CIND=?'):
|
|
ag.send_response_line(
|
|
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
|
|
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
|
|
'("callheld",(0-2))'
|
|
)
|
|
elif line.startswith('AT+CIND?'):
|
|
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
|
|
ag.send_response_line('OK')
|
|
|
|
ag_task = asyncio.create_task(ag_loop())
|
|
|
|
await hf.initiate_slc()
|
|
ag_task.cancel()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def run():
|
|
await test_slc()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
|
asyncio.run(run())
|