From 21497d0a40438a986458c7d2d95551c2ca6b8b7f Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Thu, 27 Jul 2023 11:29:23 -0700 Subject: [PATCH] use type object instead of type strings --- bumble/l2cap.py | 54 +++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/bumble/l2cap.py b/bumble/l2cap.py index 7abde0d..4464afc 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -724,12 +724,12 @@ class Channel(EventEmitter): response: Optional[asyncio.Future[bytes]] sink: Optional[Callable[[bytes], Any]] state: int - connection: 'Connection' + connection: Connection def __init__( self, manager: 'ChannelManager', - connection: 'Connection', + connection: Connection, signaling_cid: int, psm: int, source_cid: int, @@ -1033,7 +1033,7 @@ class LeConnectionOrientedChannel(EventEmitter): disconnection_result: Optional[asyncio.Future[None]] out_sdu: Optional[bytes] state: int - connection: 'Connection' + connection: Connection @staticmethod def state_name(state: int) -> str: @@ -1042,7 +1042,7 @@ class LeConnectionOrientedChannel(EventEmitter): def __init__( self, manager: 'ChannelManager', - connection: 'Connection', + connection: Connection, le_psm: int, source_cid: int, destination_cid: int, @@ -1471,7 +1471,7 @@ class ChannelManager: ): raise ValueError('MPS out of range') - def next_identifier(self, connection: 'Connection') -> int: + def next_identifier(self, connection: Connection) -> int: identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256 self.identifiers[connection.handle] = identifier return identifier @@ -1574,7 +1574,7 @@ class ChannelManager: ) self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu)) - def on_pdu(self, connection: 'Connection', cid: int, pdu) -> None: + def on_pdu(self, connection: Connection, cid: int, pdu) -> None: if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID): # Parse the L2CAP payload into a Control Frame object control_frame = L2CAP_Control_Frame.from_bytes(pdu) @@ -1596,7 +1596,7 @@ class ChannelManager: channel.on_pdu(pdu) def send_control_frame( - self, connection: 'Connection', cid: int, control_frame + self, connection: Connection, cid: int, control_frame ) -> None: logger.debug( f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} ' @@ -1605,9 +1605,7 @@ class ChannelManager: ) self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame)) - def on_control_frame( - self, connection: 'Connection', cid: int, control_frame - ) -> None: + def on_control_frame(self, connection: Connection, cid: int, control_frame) -> None: logger.debug( f'{color("<<< Received L2CAP Signaling Control Frame", "green")} ' f'on connection [0x{connection.handle:04X}] (CID={cid}) ' @@ -1645,12 +1643,12 @@ class ChannelManager: ) def on_l2cap_command_reject( - self, _connection: 'Connection', _cid: int, packet + self, _connection: Connection, _cid: int, packet ) -> None: logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}') def on_l2cap_connection_request( - self, connection: 'Connection', cid: int, request + self, connection: Connection, cid: int, request ) -> None: # Check if there's a server for this PSM server = self.servers.get(request.psm) @@ -1704,7 +1702,7 @@ class ChannelManager: ) def on_l2cap_connection_response( - self, connection: 'Connection', cid: int, response + self, connection: Connection, cid: int, response ) -> None: if ( channel := self.find_channel(connection.handle, response.source_cid) @@ -1721,7 +1719,7 @@ class ChannelManager: channel.on_connection_response(response) def on_l2cap_configure_request( - self, connection: 'Connection', cid: int, request + self, connection: Connection, cid: int, request ) -> None: if ( channel := self.find_channel(connection.handle, request.destination_cid) @@ -1738,7 +1736,7 @@ class ChannelManager: channel.on_configure_request(request) def on_l2cap_configure_response( - self, connection: 'Connection', cid: int, response + self, connection: Connection, cid: int, response ) -> None: if ( channel := self.find_channel(connection.handle, response.source_cid) @@ -1755,7 +1753,7 @@ class ChannelManager: channel.on_configure_response(response) def on_l2cap_disconnection_request( - self, connection: 'Connection', cid: int, request + self, connection: Connection, cid: int, request ) -> None: if ( channel := self.find_channel(connection.handle, request.destination_cid) @@ -1772,7 +1770,7 @@ class ChannelManager: channel.on_disconnection_request(request) def on_l2cap_disconnection_response( - self, connection: 'Connection', cid: int, response + self, connection: Connection, cid: int, response ) -> None: if ( channel := self.find_channel(connection.handle, response.source_cid) @@ -1788,9 +1786,7 @@ class ChannelManager: channel.on_disconnection_response(response) - def on_l2cap_echo_request( - self, connection: 'Connection', cid: int, request - ) -> None: + def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None: logger.debug(f'<<< Echo request: data={request.data.hex()}') self.send_control_frame( connection, @@ -1799,13 +1795,13 @@ class ChannelManager: ) def on_l2cap_echo_response( - self, _connection: 'Connection', _cid: int, response + self, _connection: Connection, _cid: int, response ) -> None: logger.debug(f'<<< Echo response: data={response.data.hex()}') # TODO notify listeners def on_l2cap_information_request( - self, connection: 'Connection', cid: int, request + self, connection: Connection, cid: int, request ) -> None: if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU: result = L2CAP_Information_Response.SUCCESS @@ -1831,7 +1827,7 @@ class ChannelManager: ) def on_l2cap_connection_parameter_update_request( - self, connection: 'Connection', cid: int, request + self, connection: Connection, cid: int, request ): if connection.role == BT_CENTRAL_ROLE: self.send_control_frame( @@ -1864,13 +1860,13 @@ class ChannelManager: ) def on_l2cap_connection_parameter_update_response( - self, connection: 'Connection', cid: int, response + self, connection: Connection, cid: int, response ) -> None: # TODO: check response pass def on_l2cap_le_credit_based_connection_request( - self, connection: 'Connection', cid: int, request + self, connection: Connection, cid: int, request ) -> None: if request.le_psm in self.le_coc_servers: (server, max_credits, mtu, mps) = self.le_coc_servers[request.le_psm] @@ -1974,7 +1970,7 @@ class ChannelManager: ) def on_l2cap_le_credit_based_connection_response( - self, connection: 'Connection', _cid: int, response + self, connection: Connection, _cid: int, response ) -> None: # Find the pending request by identifier request = self.le_coc_requests.get(response.identifier) @@ -1999,7 +1995,7 @@ class ChannelManager: channel.on_connection_response(response) def on_l2cap_le_flow_control_credit( - self, connection: 'Connection', _cid: int, credit + self, connection: Connection, _cid: int, credit ) -> None: channel = self.find_le_coc_channel(connection.handle, credit.cid) if channel is None: @@ -2015,7 +2011,7 @@ class ChannelManager: del connection_channels[channel.source_cid] async def open_le_coc( - self, connection: 'Connection', psm: int, max_credits: int, mtu: int, mps: int + self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int ) -> LeConnectionOrientedChannel: self.check_le_coc_parameters(max_credits, mtu, mps) @@ -2057,7 +2053,7 @@ class ChannelManager: return channel - async def connect(self, connection: 'Connection', psm: int) -> Channel: + async def connect(self, connection: Connection, psm: int) -> Channel: # NOTE: this implementation hard-codes BR/EDR # Find a free CID for a new channel