RFCOMM: Refactor role to enum

This commit is contained in:
Josh Wu
2023-08-21 15:16:34 +08:00
parent 2527a711dc
commit e6b566b848
+12 -10
View File
@@ -387,7 +387,7 @@ class DLC(EventEmitter):
self.tx_buffer = b''
self.state = DLC.State.INIT
self.role = multiplexer.role
self.c_r = 1 if self.role == Multiplexer.INITIATOR else 0
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None
self.connection_result = None
@@ -593,9 +593,9 @@ class DLC(EventEmitter):
# -----------------------------------------------------------------------------
class Multiplexer(EventEmitter):
# Roles
INITIATOR = 0x00
RESPONDER = 0x01
class Role(enum.IntEnum):
INITIATOR = 0x00
RESPONDER = 0x01
class State(enum.IntEnum):
INIT = 0x00
@@ -612,7 +612,7 @@ class Multiplexer(EventEmitter):
acceptor: Optional[Callable[[int], bool]]
dlcs: Dict[int, DLC]
def __init__(self, l2cap_channel: l2cap.Channel, role: int) -> None:
def __init__(self, l2cap_channel: l2cap.Channel, role: Role) -> None:
super().__init__()
self.role = role
self.l2cap_channel = l2cap_channel
@@ -695,7 +695,9 @@ class Multiplexer(EventEmitter):
def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
self.change_state(Multiplexer.State.DISCONNECTED)
self.send_frame(
RFCOMM_Frame.ua(c_r=0 if self.role == Multiplexer.INITIATOR else 1, dlci=0)
RFCOMM_Frame.ua(
c_r=0 if self.role == Multiplexer.Role.INITIATOR else 1, dlci=0
)
)
def on_uih_frame(self, frame: RFCOMM_Frame) -> None:
@@ -774,7 +776,7 @@ class Multiplexer(EventEmitter):
self.change_state(Multiplexer.State.DISCONNECTING)
self.send_frame(
RFCOMM_Frame.disc(
c_r=1 if self.role == Multiplexer.INITIATOR else 0, dlci=0
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=0
)
)
await self.disconnection_result
@@ -801,7 +803,7 @@ class Multiplexer(EventEmitter):
self.change_state(Multiplexer.State.OPENING)
self.send_frame(
RFCOMM_Frame.uih(
c_r=1 if self.role == Multiplexer.INITIATOR else 0,
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0,
dlci=0,
information=mcc,
)
@@ -843,7 +845,7 @@ class Client:
assert self.l2cap_channel is not None
# Create a mutliplexer to manage DLCs with the server
self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.INITIATOR)
self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR)
# Connect the multiplexer
await self.multiplexer.connect()
@@ -904,7 +906,7 @@ class Server(EventEmitter):
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
# Create a new multiplexer for the channel
multiplexer = Multiplexer(l2cap_channel, Multiplexer.RESPONDER)
multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER)
multiplexer.acceptor = self.accept_dlc
multiplexer.on('dlc', self.on_dlc)