From 5fba6b1caedf9e0c16220ef0cc19b76e7bb7f387 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Thu, 27 Jul 2023 19:19:11 +0800 Subject: [PATCH] Complete typing in RFCOMM --- bumble/rfcomm.py | 77 +++++++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 0176a78..579fad9 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -19,12 +19,15 @@ import logging import asyncio from pyee import EventEmitter -from typing import Optional, Tuple, Callable, Dict, Union +from typing import Optional, Tuple, Callable, Dict, Union, TYPE_CHECKING from . import core, l2cap from .colors import color from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError +if TYPE_CHECKING: + from bumble.device import Device, Connection + # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- @@ -149,9 +152,9 @@ class RFCOMM_Frame: return RFCOMM_FRAME_TYPE_NAMES[self.type] @staticmethod - def parse_mcc(data) -> Tuple[int, int, bytes]: + def parse_mcc(data) -> Tuple[int, bool, bytes]: mcc_type = data[0] >> 2 - c_r = (data[0] >> 1) & 1 + c_r = bool((data[0] >> 1) & 1) length = data[1] if data[1] & 1: length >>= 1 @@ -215,7 +218,7 @@ class RFCOMM_Frame: return frame - def __bytes__(self): + def __bytes__(self) -> bytes: return ( bytes([self.address, self.control]) + self.length @@ -223,7 +226,7 @@ class RFCOMM_Frame: + bytes([self.fcs]) ) - def __str__(self): + def __str__(self) -> str: return ( f'{color(self.type_name(), "yellow")}' f'(c/r={self.c_r},' @@ -253,7 +256,7 @@ class RFCOMM_MCC_PN: max_frame_size: int, max_retransmissions: int, window_size: int, - ): + ) -> None: self.dlci = dlci self.cl = cl self.priority = priority @@ -263,7 +266,7 @@ class RFCOMM_MCC_PN: self.window_size = window_size @staticmethod - def from_bytes(data: bytes): + def from_bytes(data: bytes) -> 'RFCOMM_MCC_PN': return RFCOMM_MCC_PN( dlci=data[0], cl=data[1], @@ -274,7 +277,7 @@ class RFCOMM_MCC_PN: window_size=data[7], ) - def __bytes__(self): + def __bytes__(self) -> bytes: return bytes( [ self.dlci & 0xFF, @@ -288,7 +291,7 @@ class RFCOMM_MCC_PN: ] ) - def __str__(self): + def __str__(self) -> str: return ( f'PN(dlci={self.dlci},' f'cl={self.cl},' @@ -309,7 +312,9 @@ class RFCOMM_MCC_MSC: ic: int dv: int - def __init__(self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int): + def __init__( + self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int + ) -> None: self.dlci = dlci self.fc = fc self.rtc = rtc @@ -318,7 +323,7 @@ class RFCOMM_MCC_MSC: self.dv = dv @staticmethod - def from_bytes(data: bytes): + def from_bytes(data: bytes) -> 'RFCOMM_MCC_MSC': return RFCOMM_MCC_MSC( dlci=data[0] >> 2, fc=data[1] >> 1 & 1, @@ -328,7 +333,7 @@ class RFCOMM_MCC_MSC: dv=data[1] >> 7 & 1, ) - def __bytes__(self): + def __bytes__(self) -> bytes: return bytes( [ (self.dlci << 2) | 3, @@ -341,7 +346,7 @@ class RFCOMM_MCC_MSC: ] ) - def __str__(self): + def __str__(self) -> str: return ( f'MSC(dlci={self.dlci},' f'fc={self.fc},' @@ -375,8 +380,12 @@ class DLC(EventEmitter): sink: Optional[Callable[[bytes], None]] def __init__( - self, multiplexer, dlci: int, max_frame_size: int, initial_tx_credits: int - ): + self, + multiplexer: 'Multiplexer', + dlci: int, + max_frame_size: int, + initial_tx_credits: int, + ) -> None: super().__init__() self.multiplexer = multiplexer self.dlci = dlci @@ -413,7 +422,7 @@ class DLC(EventEmitter): handler = getattr(self, f'on_{frame.type_name()}_frame'.lower()) handler(frame) - def on_sabm_frame(self, _frame) -> None: + def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: if self.state != DLC.CONNECTING: logger.warning( color('!!! received SABM when not in CONNECTING state', 'red') @@ -433,7 +442,7 @@ class DLC(EventEmitter): self.change_state(DLC.CONNECTED) self.emit('open') - def on_ua_frame(self, _frame) -> None: + def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: if self.state != DLC.CONNECTING: logger.warning( color('!!! received SABM when not in CONNECTING state', 'red') @@ -451,11 +460,11 @@ class DLC(EventEmitter): self.change_state(DLC.CONNECTED) self.multiplexer.on_dlc_open_complete(self) - def on_dm_frame(self, frame) -> None: + def on_dm_frame(self, frame: RFCOMM_Frame) -> None: # TODO: handle all states pass - def on_disc_frame(self, _frame) -> None: + def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: # TODO: handle all states self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci)) @@ -489,10 +498,10 @@ class DLC(EventEmitter): # Check if there's anything to send (including credits) self.process_tx() - def on_ui_frame(self, frame) -> None: + def on_ui_frame(self, frame: RFCOMM_Frame) -> None: pass - def on_mcc_msc(self, c_r, msc) -> None: + def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None: if c_r: # Command logger.debug(f'<<< MCC MSC Command: {msc}') @@ -592,7 +601,7 @@ class DLC(EventEmitter): # TODO pass - def __str__(self): + def __str__(self) -> str: return f'DLC(dlci={self.dlci},state={self.state_name(self.state)})' @@ -679,14 +688,14 @@ class Multiplexer(EventEmitter): handler = getattr(self, f'on_{frame.type_name()}_frame'.lower()) handler(frame) - def on_sabm_frame(self, _frame) -> None: + def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: if self.state != Multiplexer.INIT: logger.debug('not in INIT state, ignoring SABM') return self.change_state(Multiplexer.CONNECTED) self.send_frame(RFCOMM_Frame.ua(c_r=1, dlci=0)) - def on_ua_frame(self, _frame) -> None: + def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: if self.state == Multiplexer.CONNECTING: self.change_state(Multiplexer.CONNECTED) if self.connection_result: @@ -698,7 +707,7 @@ class Multiplexer(EventEmitter): self.disconnection_result.set_result(None) self.disconnection_result = None - def on_dm_frame(self, _frame) -> None: + def on_dm_frame(self, _frame: RFCOMM_Frame) -> None: if self.state == Multiplexer.OPENING: self.change_state(Multiplexer.CONNECTED) if self.open_result: @@ -713,7 +722,7 @@ class Multiplexer(EventEmitter): else: logger.warning(f'unexpected state for DM: {self}') - def on_disc_frame(self, _frame) -> None: + def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: self.change_state(Multiplexer.DISCONNECTED) self.send_frame( RFCOMM_Frame.ua(c_r=0 if self.role == Multiplexer.INITIATOR else 1, dlci=0) @@ -729,11 +738,11 @@ class Multiplexer(EventEmitter): mcs = RFCOMM_MCC_MSC.from_bytes(value) self.on_mcc_msc(c_r, mcs) - def on_ui_frame(self, frame) -> None: + def on_ui_frame(self, frame: RFCOMM_Frame) -> None: pass - def on_mcc_pn(self, c_r, pn) -> None: - if c_r == 1: + def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None: + if c_r: # Command logger.debug(f'<<< PN Command: {pn}') @@ -771,7 +780,7 @@ class Multiplexer(EventEmitter): else: logger.warning('ignoring PN response') - def on_mcc_msc(self, c_r, msc) -> None: + def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None: dlc = self.dlcs.get(msc.dlci) if dlc is None: logger.warning(f'no dlc for DLCI {msc.dlci}') @@ -831,13 +840,13 @@ class Multiplexer(EventEmitter): self.open_result = None return result - def on_dlc_open_complete(self, dlc: DLC): + def on_dlc_open_complete(self, dlc: DLC) -> None: logger.debug(f'DLC [{dlc.dlci}] open complete') self.change_state(Multiplexer.CONNECTED) if self.open_result: self.open_result.set_result(dlc) - def __str__(self): + def __str__(self) -> str: return f'Multiplexer(state={self.state_name(self.state)})' @@ -846,7 +855,7 @@ class Client: multiplexer: Optional[Multiplexer] l2cap_channel: Optional[l2cap.Channel] - def __init__(self, device, connection) -> None: + def __init__(self, device: 'Device', connection: 'Connection') -> None: self.device = device self.connection = connection self.l2cap_channel = None @@ -886,7 +895,7 @@ class Client: class Server(EventEmitter): acceptors: Dict[int, Callable[[DLC], None]] - def __init__(self, device) -> None: + def __init__(self, device: 'Device') -> None: super().__init__() self.device = device self.multiplexer = None