Bring HfpProtocol back

This commit is contained in:
Josh Wu
2023-09-07 22:39:05 +08:00
parent 9303f4fc5b
commit 3852aa056b
2 changed files with 62 additions and 61 deletions

View File

@@ -15,16 +15,19 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import collections.abc
import logging
import asyncio
import dataclasses
import enum
import traceback
import warnings
from typing import Dict, List, Union, Set
from . import at
from . import rfcomm
from bumble.colors import color
from bumble.core import (
ProtocolError,
BT_GENERIC_AUDIO_SERVICE,
@@ -49,6 +52,62 @@ from bumble.sdp import (
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
buffer: str
lines: collections.deque
lines_available: asyncio.Event
def __init__(self, dlc: rfcomm.DLC) -> None:
warnings.warn("See HfProtocol", DeprecationWarning)
self.dlc = dlc
self.buffer = ''
self.lines = collections.deque()
self.lines_available = asyncio.Event()
dlc.sink = self.feed
def feed(self, data: Union[bytes, str]) -> None:
# Convert the data to a string if needed
if isinstance(data, bytes):
data = data.decode('utf-8')
logger.debug(f'<<< Data received: {data}')
# Add to the buffer and look for lines
self.buffer += data
while (separator := self.buffer.find('\r')) >= 0:
line = self.buffer[:separator].strip()
self.buffer = self.buffer[separator + 1 :]
if len(line) > 0:
self.on_line(line)
def on_line(self, line: str) -> None:
self.lines.append(line)
self.lines_available.set()
def send_command_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write(line + '\r')
def send_response_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write('\r\n' + line + '\r\n')
async def next_line(self) -> str:
await self.lines_available.wait()
line = self.lines.popleft()
if not self.lines:
self.lines_available.clear()
logger.debug(color(f'<<< {line}', 'green'))
return line
# -----------------------------------------------------------------------------
# Normative protocol definitions
# -----------------------------------------------------------------------------