Compare commits

...

1 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
21497d0a40 use type object instead of type strings 2023-07-27 11:29:23 -07:00

View File

@@ -724,12 +724,12 @@ class Channel(EventEmitter):
response: Optional[asyncio.Future[bytes]]
sink: Optional[Callable[[bytes], Any]]
state: int
connection: 'Connection'
connection: Connection
def __init__(
self,
manager: 'ChannelManager',
connection: 'Connection',
connection: Connection,
signaling_cid: int,
psm: int,
source_cid: int,
@@ -1033,7 +1033,7 @@ class LeConnectionOrientedChannel(EventEmitter):
disconnection_result: Optional[asyncio.Future[None]]
out_sdu: Optional[bytes]
state: int
connection: 'Connection'
connection: Connection
@staticmethod
def state_name(state: int) -> str:
@@ -1042,7 +1042,7 @@ class LeConnectionOrientedChannel(EventEmitter):
def __init__(
self,
manager: 'ChannelManager',
connection: 'Connection',
connection: Connection,
le_psm: int,
source_cid: int,
destination_cid: int,
@@ -1471,7 +1471,7 @@ class ChannelManager:
):
raise ValueError('MPS out of range')
def next_identifier(self, connection: 'Connection') -> int:
def next_identifier(self, connection: Connection) -> int:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
@@ -1574,7 +1574,7 @@ class ChannelManager:
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
def on_pdu(self, connection: 'Connection', cid: int, pdu) -> None:
def on_pdu(self, connection: Connection, cid: int, pdu) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
# Parse the L2CAP payload into a Control Frame object
control_frame = L2CAP_Control_Frame.from_bytes(pdu)
@@ -1596,7 +1596,7 @@ class ChannelManager:
channel.on_pdu(pdu)
def send_control_frame(
self, connection: 'Connection', cid: int, control_frame
self, connection: Connection, cid: int, control_frame
) -> None:
logger.debug(
f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
@@ -1605,9 +1605,7 @@ class ChannelManager:
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
def on_control_frame(
self, connection: 'Connection', cid: int, control_frame
) -> None:
def on_control_frame(self, connection: Connection, cid: int, control_frame) -> None:
logger.debug(
f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
@@ -1645,12 +1643,12 @@ class ChannelManager:
)
def on_l2cap_command_reject(
self, _connection: 'Connection', _cid: int, packet
self, _connection: Connection, _cid: int, packet
) -> None:
logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
def on_l2cap_connection_request(
self, connection: 'Connection', cid: int, request
self, connection: Connection, cid: int, request
) -> None:
# Check if there's a server for this PSM
server = self.servers.get(request.psm)
@@ -1704,7 +1702,7 @@ class ChannelManager:
)
def on_l2cap_connection_response(
self, connection: 'Connection', cid: int, response
self, connection: Connection, cid: int, response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
@@ -1721,7 +1719,7 @@ class ChannelManager:
channel.on_connection_response(response)
def on_l2cap_configure_request(
self, connection: 'Connection', cid: int, request
self, connection: Connection, cid: int, request
) -> None:
if (
channel := self.find_channel(connection.handle, request.destination_cid)
@@ -1738,7 +1736,7 @@ class ChannelManager:
channel.on_configure_request(request)
def on_l2cap_configure_response(
self, connection: 'Connection', cid: int, response
self, connection: Connection, cid: int, response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
@@ -1755,7 +1753,7 @@ class ChannelManager:
channel.on_configure_response(response)
def on_l2cap_disconnection_request(
self, connection: 'Connection', cid: int, request
self, connection: Connection, cid: int, request
) -> None:
if (
channel := self.find_channel(connection.handle, request.destination_cid)
@@ -1772,7 +1770,7 @@ class ChannelManager:
channel.on_disconnection_request(request)
def on_l2cap_disconnection_response(
self, connection: 'Connection', cid: int, response
self, connection: Connection, cid: int, response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
@@ -1788,9 +1786,7 @@ class ChannelManager:
channel.on_disconnection_response(response)
def on_l2cap_echo_request(
self, connection: 'Connection', cid: int, request
) -> None:
def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None:
logger.debug(f'<<< Echo request: data={request.data.hex()}')
self.send_control_frame(
connection,
@@ -1799,13 +1795,13 @@ class ChannelManager:
)
def on_l2cap_echo_response(
self, _connection: 'Connection', _cid: int, response
self, _connection: Connection, _cid: int, response
) -> None:
logger.debug(f'<<< Echo response: data={response.data.hex()}')
# TODO notify listeners
def on_l2cap_information_request(
self, connection: 'Connection', cid: int, request
self, connection: Connection, cid: int, request
) -> None:
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
result = L2CAP_Information_Response.SUCCESS
@@ -1831,7 +1827,7 @@ class ChannelManager:
)
def on_l2cap_connection_parameter_update_request(
self, connection: 'Connection', cid: int, request
self, connection: Connection, cid: int, request
):
if connection.role == BT_CENTRAL_ROLE:
self.send_control_frame(
@@ -1864,13 +1860,13 @@ class ChannelManager:
)
def on_l2cap_connection_parameter_update_response(
self, connection: 'Connection', cid: int, response
self, connection: Connection, cid: int, response
) -> None:
# TODO: check response
pass
def on_l2cap_le_credit_based_connection_request(
self, connection: 'Connection', cid: int, request
self, connection: Connection, cid: int, request
) -> None:
if request.le_psm in self.le_coc_servers:
(server, max_credits, mtu, mps) = self.le_coc_servers[request.le_psm]
@@ -1974,7 +1970,7 @@ class ChannelManager:
)
def on_l2cap_le_credit_based_connection_response(
self, connection: 'Connection', _cid: int, response
self, connection: Connection, _cid: int, response
) -> None:
# Find the pending request by identifier
request = self.le_coc_requests.get(response.identifier)
@@ -1999,7 +1995,7 @@ class ChannelManager:
channel.on_connection_response(response)
def on_l2cap_le_flow_control_credit(
self, connection: 'Connection', _cid: int, credit
self, connection: Connection, _cid: int, credit
) -> None:
channel = self.find_le_coc_channel(connection.handle, credit.cid)
if channel is None:
@@ -2015,7 +2011,7 @@ class ChannelManager:
del connection_channels[channel.source_cid]
async def open_le_coc(
self, connection: 'Connection', psm: int, max_credits: int, mtu: int, mps: int
self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int
) -> LeConnectionOrientedChannel:
self.check_le_coc_parameters(max_credits, mtu, mps)
@@ -2057,7 +2053,7 @@ class ChannelManager:
return channel
async def connect(self, connection: 'Connection', psm: int) -> Channel:
async def connect(self, connection: Connection, psm: int) -> Channel:
# NOTE: this implementation hard-codes BR/EDR
# Find a free CID for a new channel