Complete typing in RFCOMM

This commit is contained in:
Josh Wu
2023-07-27 19:19:11 +08:00
parent f4add16aea
commit 5fba6b1cae
+43 -34
View File
@@ -19,12 +19,15 @@ import logging
import asyncio
from pyee import EventEmitter
from typing import Optional, Tuple, Callable, Dict, Union
from typing import Optional, Tuple, Callable, Dict, Union, TYPE_CHECKING
from . import core, l2cap
from .colors import color
from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError
if TYPE_CHECKING:
from bumble.device import Device, Connection
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -149,9 +152,9 @@ class RFCOMM_Frame:
return RFCOMM_FRAME_TYPE_NAMES[self.type]
@staticmethod
def parse_mcc(data) -> Tuple[int, int, bytes]:
def parse_mcc(data) -> Tuple[int, bool, bytes]:
mcc_type = data[0] >> 2
c_r = (data[0] >> 1) & 1
c_r = bool((data[0] >> 1) & 1)
length = data[1]
if data[1] & 1:
length >>= 1
@@ -215,7 +218,7 @@ class RFCOMM_Frame:
return frame
def __bytes__(self):
def __bytes__(self) -> bytes:
return (
bytes([self.address, self.control])
+ self.length
@@ -223,7 +226,7 @@ class RFCOMM_Frame:
+ bytes([self.fcs])
)
def __str__(self):
def __str__(self) -> str:
return (
f'{color(self.type_name(), "yellow")}'
f'(c/r={self.c_r},'
@@ -253,7 +256,7 @@ class RFCOMM_MCC_PN:
max_frame_size: int,
max_retransmissions: int,
window_size: int,
):
) -> None:
self.dlci = dlci
self.cl = cl
self.priority = priority
@@ -263,7 +266,7 @@ class RFCOMM_MCC_PN:
self.window_size = window_size
@staticmethod
def from_bytes(data: bytes):
def from_bytes(data: bytes) -> 'RFCOMM_MCC_PN':
return RFCOMM_MCC_PN(
dlci=data[0],
cl=data[1],
@@ -274,7 +277,7 @@ class RFCOMM_MCC_PN:
window_size=data[7],
)
def __bytes__(self):
def __bytes__(self) -> bytes:
return bytes(
[
self.dlci & 0xFF,
@@ -288,7 +291,7 @@ class RFCOMM_MCC_PN:
]
)
def __str__(self):
def __str__(self) -> str:
return (
f'PN(dlci={self.dlci},'
f'cl={self.cl},'
@@ -309,7 +312,9 @@ class RFCOMM_MCC_MSC:
ic: int
dv: int
def __init__(self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int):
def __init__(
self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int
) -> None:
self.dlci = dlci
self.fc = fc
self.rtc = rtc
@@ -318,7 +323,7 @@ class RFCOMM_MCC_MSC:
self.dv = dv
@staticmethod
def from_bytes(data: bytes):
def from_bytes(data: bytes) -> 'RFCOMM_MCC_MSC':
return RFCOMM_MCC_MSC(
dlci=data[0] >> 2,
fc=data[1] >> 1 & 1,
@@ -328,7 +333,7 @@ class RFCOMM_MCC_MSC:
dv=data[1] >> 7 & 1,
)
def __bytes__(self):
def __bytes__(self) -> bytes:
return bytes(
[
(self.dlci << 2) | 3,
@@ -341,7 +346,7 @@ class RFCOMM_MCC_MSC:
]
)
def __str__(self):
def __str__(self) -> str:
return (
f'MSC(dlci={self.dlci},'
f'fc={self.fc},'
@@ -375,8 +380,12 @@ class DLC(EventEmitter):
sink: Optional[Callable[[bytes], None]]
def __init__(
self, multiplexer, dlci: int, max_frame_size: int, initial_tx_credits: int
):
self,
multiplexer: 'Multiplexer',
dlci: int,
max_frame_size: int,
initial_tx_credits: int,
) -> None:
super().__init__()
self.multiplexer = multiplexer
self.dlci = dlci
@@ -413,7 +422,7 @@ class DLC(EventEmitter):
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler(frame)
def on_sabm_frame(self, _frame) -> None:
def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state != DLC.CONNECTING:
logger.warning(
color('!!! received SABM when not in CONNECTING state', 'red')
@@ -433,7 +442,7 @@ class DLC(EventEmitter):
self.change_state(DLC.CONNECTED)
self.emit('open')
def on_ua_frame(self, _frame) -> None:
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state != DLC.CONNECTING:
logger.warning(
color('!!! received SABM when not in CONNECTING state', 'red')
@@ -451,11 +460,11 @@ class DLC(EventEmitter):
self.change_state(DLC.CONNECTED)
self.multiplexer.on_dlc_open_complete(self)
def on_dm_frame(self, frame) -> None:
def on_dm_frame(self, frame: RFCOMM_Frame) -> None:
# TODO: handle all states
pass
def on_disc_frame(self, _frame) -> None:
def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
# TODO: handle all states
self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci))
@@ -489,10 +498,10 @@ class DLC(EventEmitter):
# Check if there's anything to send (including credits)
self.process_tx()
def on_ui_frame(self, frame) -> None:
def on_ui_frame(self, frame: RFCOMM_Frame) -> None:
pass
def on_mcc_msc(self, c_r, msc) -> None:
def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None:
if c_r:
# Command
logger.debug(f'<<< MCC MSC Command: {msc}')
@@ -592,7 +601,7 @@ class DLC(EventEmitter):
# TODO
pass
def __str__(self):
def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state_name(self.state)})'
@@ -679,14 +688,14 @@ class Multiplexer(EventEmitter):
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler(frame)
def on_sabm_frame(self, _frame) -> None:
def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state != Multiplexer.INIT:
logger.debug('not in INIT state, ignoring SABM')
return
self.change_state(Multiplexer.CONNECTED)
self.send_frame(RFCOMM_Frame.ua(c_r=1, dlci=0))
def on_ua_frame(self, _frame) -> None:
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state == Multiplexer.CONNECTING:
self.change_state(Multiplexer.CONNECTED)
if self.connection_result:
@@ -698,7 +707,7 @@ class Multiplexer(EventEmitter):
self.disconnection_result.set_result(None)
self.disconnection_result = None
def on_dm_frame(self, _frame) -> None:
def on_dm_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state == Multiplexer.OPENING:
self.change_state(Multiplexer.CONNECTED)
if self.open_result:
@@ -713,7 +722,7 @@ class Multiplexer(EventEmitter):
else:
logger.warning(f'unexpected state for DM: {self}')
def on_disc_frame(self, _frame) -> None:
def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
self.change_state(Multiplexer.DISCONNECTED)
self.send_frame(
RFCOMM_Frame.ua(c_r=0 if self.role == Multiplexer.INITIATOR else 1, dlci=0)
@@ -729,11 +738,11 @@ class Multiplexer(EventEmitter):
mcs = RFCOMM_MCC_MSC.from_bytes(value)
self.on_mcc_msc(c_r, mcs)
def on_ui_frame(self, frame) -> None:
def on_ui_frame(self, frame: RFCOMM_Frame) -> None:
pass
def on_mcc_pn(self, c_r, pn) -> None:
if c_r == 1:
def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None:
if c_r:
# Command
logger.debug(f'<<< PN Command: {pn}')
@@ -771,7 +780,7 @@ class Multiplexer(EventEmitter):
else:
logger.warning('ignoring PN response')
def on_mcc_msc(self, c_r, msc) -> None:
def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None:
dlc = self.dlcs.get(msc.dlci)
if dlc is None:
logger.warning(f'no dlc for DLCI {msc.dlci}')
@@ -831,13 +840,13 @@ class Multiplexer(EventEmitter):
self.open_result = None
return result
def on_dlc_open_complete(self, dlc: DLC):
def on_dlc_open_complete(self, dlc: DLC) -> None:
logger.debug(f'DLC [{dlc.dlci}] open complete')
self.change_state(Multiplexer.CONNECTED)
if self.open_result:
self.open_result.set_result(dlc)
def __str__(self):
def __str__(self) -> str:
return f'Multiplexer(state={self.state_name(self.state)})'
@@ -846,7 +855,7 @@ class Client:
multiplexer: Optional[Multiplexer]
l2cap_channel: Optional[l2cap.Channel]
def __init__(self, device, connection) -> None:
def __init__(self, device: 'Device', connection: 'Connection') -> None:
self.device = device
self.connection = connection
self.l2cap_channel = None
@@ -886,7 +895,7 @@ class Client:
class Server(EventEmitter):
acceptors: Dict[int, Callable[[DLC], None]]
def __init__(self, device) -> None:
def __init__(self, device: 'Device') -> None:
super().__init__()
self.device = device
self.multiplexer = None