Add Classic Local Link support

Currently supported features:
* Connect
* Accept
* Switch Role
* Disconnect
* ACL data transmittion
This commit is contained in:
Josh Wu
2023-03-24 15:20:36 +08:00
parent e77723a5f9
commit 21d8a0d577
4 changed files with 435 additions and 50 deletions

View File

@@ -22,6 +22,7 @@ import os
import pytest
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -47,18 +48,19 @@ class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = LocalLink()
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
Controller('C1', link=self.link, public_address=addresses[0]),
Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
Device(
address='F0:F1:F2:F3:F4:F5',
address=addresses[0],
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address='F5:F4:F3:F2:F1:F0',
address=addresses[1],
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]
@@ -98,6 +100,49 @@ async def test_self_connection():
assert two_devices.connections[1] is not None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
'responder_role,',
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
)
async def test_self_classic_connection(responder_role):
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await asyncio.gather(
two_devices.devices[0].connect(
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
),
two_devices.devices[1].accept(
two_devices.devices[0].public_address, responder_role
),
)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
await two_devices.connections[0].disconnect()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_gatt():