Add typing for HFP

This commit is contained in:
Josh Wu
2023-06-05 16:03:02 +08:00
parent a74c39dc2b
commit 39db278f2e

View File

@@ -18,10 +18,11 @@
import logging import logging
import asyncio import asyncio
import collections import collections
from typing import Union
from . import rfcomm
from .colors import color from .colors import color
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -34,7 +35,12 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HfpProtocol: class HfpProtocol:
def __init__(self, dlc): dlc: rfcomm.DLC
buffer: str
lines: collections.deque
lines_available: asyncio.Event
def __init__(self, dlc: rfcomm.DLC) -> None:
self.dlc = dlc self.dlc = dlc
self.buffer = '' self.buffer = ''
self.lines = collections.deque() self.lines = collections.deque()
@@ -42,7 +48,7 @@ class HfpProtocol:
dlc.sink = self.feed dlc.sink = self.feed
def feed(self, data): def feed(self, data: Union[bytes, str]) -> None:
# Convert the data to a string if needed # Convert the data to a string if needed
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('utf-8') data = data.decode('utf-8')
@@ -57,19 +63,19 @@ class HfpProtocol:
if len(line) > 0: if len(line) > 0:
self.on_line(line) self.on_line(line)
def on_line(self, line): def on_line(self, line: str) -> None:
self.lines.append(line) self.lines.append(line)
self.lines_available.set() self.lines_available.set()
def send_command_line(self, line): def send_command_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow')) logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write(line + '\r') self.dlc.write(line + '\r')
def send_response_line(self, line): def send_response_line(self, line: str) -> None:
logger.debug(color(f'>>> {line}', 'yellow')) logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write('\r\n' + line + '\r\n') self.dlc.write('\r\n' + line + '\r\n')
async def next_line(self): async def next_line(self) -> str:
await self.lines_available.wait() await self.lines_available.wait()
line = self.lines.popleft() line = self.lines.popleft()
if not self.lines: if not self.lines:
@@ -77,7 +83,7 @@ class HfpProtocol:
logger.debug(color(f'<<< {line}', 'green')) logger.debug(color(f'<<< {line}', 'green'))
return line return line
async def initialize_service(self): async def initialize_service(self) -> None:
# Perform Service Level Connection Initialization # Perform Service Level Connection Initialization
self.send_command_line('AT+BRSF=2072') # Retrieve Supported Features self.send_command_line('AT+BRSF=2072') # Retrieve Supported Features
await (self.next_line()) await (self.next_line())