L2CAP: FCS Implementation

This commit is contained in:
Josh Wu
2025-09-26 21:59:56 +08:00
parent 57e05781ad
commit 456cb59b48
6 changed files with 335 additions and 170 deletions

View File

@@ -12,35 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import argparse
import asyncio
import sys
from typing import Optional
import bumble.logging
from bumble import core, l2cap
from bumble.device import Device
from bumble import l2cap
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def main() -> None:
async def main(
config_file: str, transport: str, mode: int, peer_address: str, psm: int
) -> None:
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[2]) as hci_transport:
async with await open_transport(transport) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
config_file, hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.l2cap_channel_manager.extended_features.add(
l2cap.L2CAP_Information_Request.ExtendedFeatures.ENHANCED_RETRANSMISSION_MODE
)
device.l2cap_channel_manager.extended_features.add(
l2cap.L2CAP_Information_Request.ExtendedFeatures.FCS_OPTION
)
# Start the controller
await device.power_on()
@@ -49,7 +56,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
channels: list[l2cap.ClassicChannel] = []
active_channel: l2cap.ClassicChannel | None = None
def on_connection(channel: l2cap.ClassicChannel):
@@ -57,25 +64,44 @@ async def main() -> None:
print(f'<<< {sdu.decode()}')
channel.sink = on_sdu
if channels:
channels.clear()
channels.append(channel)
nonlocal active_channel
active_channel = channel
server = device.create_l2cap_server(
spec=l2cap.ClassicChannelSpec(
mode=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION
mode=l2cap.TransmissionMode(mode), psm=psm if psm else None
),
handler=on_connection,
)
print(f'Listen L2CAP on channel {server.psm}')
if peer_address:
connection = await device.connect(
peer_address, transport=core.PhysicalTransport.BR_EDR
)
channel = await connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(
mode=l2cap.TransmissionMode(mode), psm=psm
)
)
active_channel = channel
while sdu := await asyncio.to_thread(lambda: input('>>> ')):
if channels:
channels[0].write(sdu.encode())
if active_channel:
active_channel.write(sdu.encode())
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
bumble.logging.setup_basic_logging('INFO')
asyncio.run(main())
parser = argparse.ArgumentParser()
parser.add_argument('config')
parser.add_argument('transport')
parser.add_argument('-p', '--peer_address', default='')
parser.add_argument(
'-m', '--mode', default=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION
)
parser.add_argument('--psm', default=0)
args = parser.parse_args(sys.argv[1:])
asyncio.run(main(args.config, args.transport, args.mode, args.peer_address, args.psm))