Merge pull request #272 from zxzxwu/gfp

Bring HfpProtocol back
This commit is contained in:
Gilles Boccon-Gibod
2023-09-07 13:03:36 -07:00
committed by GitHub
3 changed files with 162 additions and 61 deletions

View File

@@ -15,16 +15,19 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import collections.abc
import logging import logging
import asyncio import asyncio
import dataclasses import dataclasses
import enum import enum
import traceback import traceback
import warnings
from typing import Dict, List, Union, Set from typing import Dict, List, Union, Set
from . import at from . import at
from . import rfcomm from . import rfcomm
from bumble.colors import color
from bumble.core import ( from bumble.core import (
ProtocolError, ProtocolError,
BT_GENERIC_AUDIO_SERVICE, BT_GENERIC_AUDIO_SERVICE,
@@ -49,6 +52,62 @@ from bumble.sdp import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
buffer: str
lines: collections.deque
lines_available: asyncio.Event
def __init__(self, dlc: rfcomm.DLC) -> None:
warnings.warn("See HfProtocol", DeprecationWarning)
self.dlc = dlc
self.buffer = ''
self.lines = collections.deque()
self.lines_available = asyncio.Event()
dlc.sink = self.feed
def feed(self, data: Union[bytes, str]) -> None:
# Convert the data to a string if needed
if isinstance(data, bytes):
data = data.decode('utf-8')
logger.debug(f'<<< Data received: {data}')
# Add to the buffer and look for lines
self.buffer += data
while (separator := self.buffer.find('\r')) >= 0:
line = self.buffer[:separator].strip()
self.buffer = self.buffer[separator + 1 :]
if len(line) > 0:
self.on_line(line)
def on_line(self, line: str) -> None:
self.lines.append(line)
self.lines_available.set()
def send_command_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write(line + '\r')
def send_response_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write('\r\n' + line + '\r\n')
async def next_line(self) -> str:
await self.lines_available.wait()
line = self.lines.popleft()
if not self.lines:
self.lines_available.clear()
logger.debug(color(f'<<< {line}', 'green'))
return line
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Normative protocol definitions # Normative protocol definitions
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -16,11 +16,9 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import collections
import sys import sys
import os import os
import logging import logging
from typing import Union
from bumble.colors import color from bumble.colors import color
@@ -32,8 +30,7 @@ from bumble.core import (
BT_RFCOMM_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID,
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
) )
from bumble import rfcomm from bumble import rfcomm, hfp
from bumble.rfcomm import Client
from bumble.sdp import ( from bumble.sdp import (
Client as SDP_Client, Client as SDP_Client,
DataElement, DataElement,
@@ -47,61 +44,6 @@ from bumble.sdp import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
buffer: str
lines: collections.deque
lines_available: asyncio.Event
def __init__(self, dlc: rfcomm.DLC) -> None:
self.dlc = dlc
self.buffer = ''
self.lines = collections.deque()
self.lines_available = asyncio.Event()
dlc.sink = self.feed
def feed(self, data: Union[bytes, str]) -> None:
# Convert the data to a string if needed
if isinstance(data, bytes):
data = data.decode('utf-8')
logger.debug(f'<<< Data received: {data}')
# Add to the buffer and look for lines
self.buffer += data
while (separator := self.buffer.find('\r')) >= 0:
line = self.buffer[:separator].strip()
self.buffer = self.buffer[separator + 1 :]
if len(line) > 0:
self.on_line(line)
def on_line(self, line: str) -> None:
self.lines.append(line)
self.lines_available.set()
def send_command_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write(line + '\r')
def send_response_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write('\r\n' + line + '\r\n')
async def next_line(self) -> str:
await self.lines_available.wait()
line = self.lines.popleft()
if not self.lines:
self.lines_available.clear()
logger.debug(color(f'<<< {line}', 'green'))
return line
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks # pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection): async def list_rfcomm_channels(device, connection):
@@ -241,7 +183,7 @@ async def main():
# Create a client and start it # Create a client and start it
print('@@@ Starting to RFCOMM client...') print('@@@ Starting to RFCOMM client...')
rfcomm_client = Client(device, connection) rfcomm_client = rfcomm.Client(device, connection)
rfcomm_mux = await rfcomm_client.start() rfcomm_mux = await rfcomm_client.start()
print('@@@ Started') print('@@@ Started')
@@ -256,7 +198,7 @@ async def main():
return return
# Protocol loop (just for testing at this point) # Protocol loop (just for testing at this point)
protocol = HfpProtocol(session) protocol = hfp.HfpProtocol(session)
while True: while True:
line = await protocol.next_line() line = await protocol.next_line()

100
tests/hfp_test.py Normal file
View File

@@ -0,0 +1,100 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import pytest
from typing import Tuple
from .test_utils import TwoDevices
from bumble import hfp
from bumble import rfcomm
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
async def make_hfp_connections(
hf_config: hfp.Configuration,
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
# Setup devices
devices = TwoDevices()
await devices.setup_connection()
# Setup RFCOMM channel
wait_dlc = asyncio.get_running_loop().create_future()
rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(
lambda dlc: wait_dlc.set_result(dlc)
)
assert devices.connections[0]
assert devices.connections[1]
client_mux = await rfcomm.Client(devices.devices[1], devices.connections[1]).start()
client_dlc = await client_mux.open_dlc(rfcomm_channel)
server_dlc = await wait_dlc
# Setup HFP connnection
hf = hfp.HfProtocol(client_dlc, hf_config)
ag = hfp.HfpProtocol(server_dlc)
return hf, ag
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc():
hf_config = hfp.Configuration(
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
)
hf, ag = await make_hfp_connections(hf_config)
async def ag_loop():
while line := await ag.next_line():
if line.startswith('AT+BRSF'):
ag.send_response_line('+BRSF: 0')
elif line.startswith('AT+CIND=?'):
ag.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
)
elif line.startswith('AT+CIND?'):
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
ag.send_response_line('OK')
ag_task = asyncio.create_task(ag_loop())
await hf.initiate_slc()
ag_task.cancel()
# -----------------------------------------------------------------------------
async def run():
await test_slc()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())