Handle L2CAP info dynamically (#28)

* Add feature and MTU fields in L2CAP manager constructor
* Add register/unregister API for fixed channels
This commit is contained in:
zxzxwu
2022-08-18 23:25:59 +08:00
committed by GitHub
parent 27cb4c586b
commit 996a9e28f4
3 changed files with 40 additions and 24 deletions
+5 -3
View File
@@ -450,7 +450,8 @@ class Device(CompositeEventEmitter):
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
self.l2cap_channel_manager = l2cap.ChannelManager()
self.l2cap_channel_manager = l2cap.ChannelManager(
[l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS])
self.advertisement_data = {}
self.scanning = False
self.discovering = False
@@ -494,6 +495,8 @@ class Device(CompositeEventEmitter):
# Setup SMP
# TODO: allow using a public address
self.smp_manager = smp.Manager(self, self.random_address)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_CID, self.on_smp_pdu)
# Register the SDP server with the L2CAP Channel Manager
self.sdp_server.register(self.l2cap_channel_manager)
@@ -501,6 +504,7 @@ class Device(CompositeEventEmitter):
# Add a GAP Service if requested
if generic_access_service:
self.gatt_server.add_service(GenericAccessService(self.name))
self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu)
# Forward some events
setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
@@ -1503,7 +1507,6 @@ class Device(CompositeEventEmitter):
def on_pairing_failure(self, connection, reason):
connection.emit('pairing_failure', reason)
@host_event_handler
@with_connection_from_handle
def on_gatt_pdu(self, connection, pdu):
# Parse the L2CAP payload into an ATT PDU object
@@ -1522,7 +1525,6 @@ class Device(CompositeEventEmitter):
return
connection.gatt_server.on_gatt_pdu(connection, att_pdu)
@host_event_handler
@with_connection_from_handle
def on_smp_pdu(self, connection, pdu):
self.smp_manager.on_smp_pdu(connection, pdu)
+1 -13
View File
@@ -56,13 +56,7 @@ class Connection:
def on_acl_pdu(self, pdu):
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
if l2cap_pdu.cid == ATT_CID:
self.host.on_gatt_pdu(self, l2cap_pdu.payload)
elif l2cap_pdu.cid == SMP_CID:
self.host.on_smp_pdu(self, l2cap_pdu.payload)
else:
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
# -----------------------------------------------------------------------------
@@ -299,12 +293,6 @@ class Host(EventEmitter):
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
def on_gatt_pdu(self, connection, pdu):
self.emit('gatt_pdu', connection.handle, pdu)
def on_smp_pdu(self, connection, pdu):
self.emit('smp_pdu', connection.handle, pdu)
def on_l2cap_pdu(self, connection, cid, pdu):
self.emit('l2cap_pdu', connection.handle, cid, pdu)
+34 -8
View File
@@ -414,6 +414,18 @@ class L2CAP_Information_Request(L2CAP_Control_Frame):
EXTENDED_FEATURES_SUPPORTED = 0x0002
FIXED_CHANNELS_SUPPORTED = 0x0003
EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001
EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002
EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004
EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008
EXTENDED_FEATURE_STREAMING_MODE = 0x0010
EXTENDED_FEATURE_FCS_OPTION = 0x0020
EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040
EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080
EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100
EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200
EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400
INFO_TYPE_NAMES = {
CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
@@ -817,11 +829,16 @@ class Channel(EventEmitter):
# -----------------------------------------------------------------------------
class ChannelManager:
def __init__(self):
self.host = None
self.channels = {} # Channels, mapped by connection and cid
self.identifiers = {} # Incrementing identifier values by connection
self.servers = {} # Servers accepting connections, by PSM
def __init__(self, extended_features=None, connectionless_mtu=1024):
self.host = None
self.channels = {} # Channels, mapped by connection and cid
# Fixed channel handlers, mapped by cid
self.fixed_channels = {
L2CAP_SIGNALING_CID: None, L2CAP_LE_SIGNALING_CID: None}
self.identifiers = {} # Incrementing identifier values by connection
self.servers = {} # Servers accepting connections, by PSM
self.extended_features = [] if extended_features is None else extended_features
self.connectionless_mtu = connectionless_mtu
def find_channel(self, connection_handle, cid):
if connection_channels := self.channels.get(connection_handle):
@@ -840,6 +857,13 @@ class ChannelManager:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
def register_fixed_channel(self, cid, handler):
self.fixed_channels[cid] = handler
def deregister_fixed_channel(self, cid):
if cid in self.fixed_channels:
del self.fixed_channels[cid]
def register_server(self, psm, server):
self.servers[psm] = server
@@ -855,6 +879,8 @@ class ChannelManager:
control_frame = L2CAP_Control_Frame.from_bytes(pdu)
self.on_control_frame(connection, cid, control_frame)
elif cid in self.fixed_channels:
self.fixed_channels[cid](connection.handle, pdu)
else:
if (channel := self.find_channel(connection.handle, cid)) is None:
logger.warn(color(f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'))
@@ -999,13 +1025,13 @@ class ChannelManager:
def on_l2cap_information_request(self, connection, cid, request):
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
result = L2CAP_Information_Response.SUCCESS
data = struct.pack('<H', 1024) # TODO: don't use a fixed value
data = self.connectionless_mtu.to_bytes(2, 'little')
elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('00000000') # TODO: don't use a fixed value
data = sum(self.extended_features).to_bytes(4, 'little')
elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('FFFFFFFFFFFFFFFF') # TODO: don't use a fixed value
data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
else:
result = L2CAP_Information_Request.NO_SUPPORTED