From 39db278f2edf5f2e228054719df52f7797eac93b Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Mon, 5 Jun 2023 16:03:02 +0800 Subject: [PATCH] Add typing for HFP --- bumble/hfp.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/bumble/hfp.py b/bumble/hfp.py index 7bb9f084..9080a55b 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -18,10 +18,11 @@ import logging import asyncio import collections +from typing import Union +from . import rfcomm from .colors import color - # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- @@ -34,7 +35,12 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- class HfpProtocol: - def __init__(self, dlc): + dlc: rfcomm.DLC + buffer: str + lines: collections.deque + lines_available: asyncio.Event + + def __init__(self, dlc: rfcomm.DLC) -> None: self.dlc = dlc self.buffer = '' self.lines = collections.deque() @@ -42,7 +48,7 @@ class HfpProtocol: dlc.sink = self.feed - def feed(self, data): + def feed(self, data: Union[bytes, str]) -> None: # Convert the data to a string if needed if isinstance(data, bytes): data = data.decode('utf-8') @@ -57,19 +63,19 @@ class HfpProtocol: if len(line) > 0: self.on_line(line) - def on_line(self, line): + def on_line(self, line: str) -> None: self.lines.append(line) self.lines_available.set() - def send_command_line(self, line): + def send_command_line(self, line: str) -> None: logger.debug(color(f'>>> {line}', 'yellow')) self.dlc.write(line + '\r') - def send_response_line(self, line): + def send_response_line(self, line: str) -> None: logger.debug(color(f'>>> {line}', 'yellow')) self.dlc.write('\r\n' + line + '\r\n') - async def next_line(self): + async def next_line(self) -> str: await self.lines_available.wait() line = self.lines.popleft() if not self.lines: @@ -77,7 +83,7 @@ class HfpProtocol: logger.debug(color(f'<<< {line}', 'green')) return line - async def initialize_service(self): + async def initialize_service(self) -> None: # Perform Service Level Connection Initialization self.send_command_line('AT+BRSF=2072') # Retrieve Supported Features await (self.next_line())