Compare commits

..

7 Commits

Author SHA1 Message Date
zxzxwu
996a9e28f4 Handle L2CAP info dynamically (#28)
* Add feature and MTU fields in L2CAP manager constructor
* Add register/unregister API for fixed channels
2022-08-18 08:25:59 -07:00
zxzxwu
27cb4c586b Delegate Classic connectable and discoverable (#27)
For remote-initiated test cases, we need the device to be
scan-configurable.
2022-08-17 14:20:32 -07:00
Gilles Boccon-Gibod
1f78243ea6 add test.release task to facilitate CI integration (#26) 2022-08-16 13:37:26 -07:00
Ray
216ce2abd0 Add release tasks (#6)
Added two tasks to tasks.py, release and release_tests.

Applied black formatter

authored-by: Raymundo Ramirez Mata <raymundora@google.com>
2022-08-16 11:50:30 -07:00
Gilles Boccon-Gibod
431445e6a2 fix imports (#25) 2022-08-16 11:29:56 -07:00
Michael Mogenson
d7cc546248 Update supported commands in console.py docs (#24)
Co-authored-by: Michael Mogenson <mogenson@google.com>
2022-08-12 14:23:21 -07:00
Gilles Boccon-Gibod
29fd19f40d gbg/fix subscribe lambda (#23)
* don't use a lambda as a subscriber

* update tests to look at side effects instead of internals
2022-08-12 14:22:31 -07:00
8 changed files with 92 additions and 49 deletions

View File

@@ -29,11 +29,11 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[test,development,documentation]"
python -m pip install ".[build,test,development,documentation]"
- name: Test with pytest
run: |
pytest
- name: Build
run: |
inv build
inv mkdocs
inv build.mkdocs

View File

@@ -315,6 +315,8 @@ class DeviceConfiguration:
self.le_simultaneous_enabled = True
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.connectable = True
self.discoverable = True
self.advertising_data = bytes(
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
)
@@ -333,6 +335,8 @@ class DeviceConfiguration:
self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled)
self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
self.connectable = config.get('connectable', self.connectable)
self.discoverable = config.get('discoverable', self.discoverable)
# Load or synthesize an IRK
irk = config.get('irk')
@@ -446,7 +450,8 @@ class Device(CompositeEventEmitter):
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
self.l2cap_channel_manager = l2cap.ChannelManager()
self.l2cap_channel_manager = l2cap.ChannelManager(
[l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS])
self.advertisement_data = {}
self.scanning = False
self.discovering = False
@@ -454,8 +459,6 @@ class Device(CompositeEventEmitter):
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.classic_enabled = False
self.discoverable = False
self.connectable = False
self.inquiry_response = None
self.address_resolver = None
@@ -476,6 +479,8 @@ class Device(CompositeEventEmitter):
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.discoverable = config.discoverable
self.connectable = config.connectable
# If a name is passed, override the name from the config
if name:
@@ -490,6 +495,8 @@ class Device(CompositeEventEmitter):
# Setup SMP
# TODO: allow using a public address
self.smp_manager = smp.Manager(self, self.random_address)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_CID, self.on_smp_pdu)
# Register the SDP server with the L2CAP Channel Manager
self.sdp_server.register(self.l2cap_channel_manager)
@@ -497,6 +504,7 @@ class Device(CompositeEventEmitter):
# Add a GAP Service if requested
if generic_access_service:
self.gatt_server.add_service(GenericAccessService(self.name))
self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu)
# Forward some events
setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
@@ -623,6 +631,8 @@ class Device(CompositeEventEmitter):
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled))
)
await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable)
# Let the SMP manager know about the address
# TODO: allow using a public address
@@ -1497,7 +1507,6 @@ class Device(CompositeEventEmitter):
def on_pairing_failure(self, connection, reason):
connection.emit('pairing_failure', reason)
@host_event_handler
@with_connection_from_handle
def on_gatt_pdu(self, connection, pdu):
# Parse the L2CAP payload into an ATT PDU object
@@ -1516,7 +1525,6 @@ class Device(CompositeEventEmitter):
return
connection.gatt_server.on_gatt_pdu(connection, att_pdu)
@host_event_handler
@with_connection_from_handle
def on_smp_pdu(self, connection, pdu):
self.smp_manager.on_smp_pdu(connection, pdu)

View File

@@ -56,13 +56,7 @@ class Connection:
def on_acl_pdu(self, pdu):
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
if l2cap_pdu.cid == ATT_CID:
self.host.on_gatt_pdu(self, l2cap_pdu.payload)
elif l2cap_pdu.cid == SMP_CID:
self.host.on_smp_pdu(self, l2cap_pdu.payload)
else:
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
# -----------------------------------------------------------------------------
@@ -299,12 +293,6 @@ class Host(EventEmitter):
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
def on_gatt_pdu(self, connection, pdu):
self.emit('gatt_pdu', connection.handle, pdu)
def on_smp_pdu(self, connection, pdu):
self.emit('smp_pdu', connection.handle, pdu)
def on_l2cap_pdu(self, connection, cid, pdu):
self.emit('l2cap_pdu', connection.handle, cid, pdu)

View File

@@ -20,11 +20,11 @@ import logging
import struct
from colors import color
from pyee import EventEmitter
from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError
from .hci import (HCI_LE_Connection_Update_Command, HCI_Object, key_with_value,
name_or_number)
from .utils import EventEmitter
# -----------------------------------------------------------------------------
# Logging
@@ -414,6 +414,18 @@ class L2CAP_Information_Request(L2CAP_Control_Frame):
EXTENDED_FEATURES_SUPPORTED = 0x0002
FIXED_CHANNELS_SUPPORTED = 0x0003
EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001
EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002
EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004
EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008
EXTENDED_FEATURE_STREAMING_MODE = 0x0010
EXTENDED_FEATURE_FCS_OPTION = 0x0020
EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040
EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080
EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100
EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200
EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400
INFO_TYPE_NAMES = {
CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
@@ -817,11 +829,16 @@ class Channel(EventEmitter):
# -----------------------------------------------------------------------------
class ChannelManager:
def __init__(self):
self.host = None
self.channels = {} # Channels, mapped by connection and cid
self.identifiers = {} # Incrementing identifier values by connection
self.servers = {} # Servers accepting connections, by PSM
def __init__(self, extended_features=None, connectionless_mtu=1024):
self.host = None
self.channels = {} # Channels, mapped by connection and cid
# Fixed channel handlers, mapped by cid
self.fixed_channels = {
L2CAP_SIGNALING_CID: None, L2CAP_LE_SIGNALING_CID: None}
self.identifiers = {} # Incrementing identifier values by connection
self.servers = {} # Servers accepting connections, by PSM
self.extended_features = [] if extended_features is None else extended_features
self.connectionless_mtu = connectionless_mtu
def find_channel(self, connection_handle, cid):
if connection_channels := self.channels.get(connection_handle):
@@ -840,6 +857,13 @@ class ChannelManager:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
def register_fixed_channel(self, cid, handler):
self.fixed_channels[cid] = handler
def deregister_fixed_channel(self, cid):
if cid in self.fixed_channels:
del self.fixed_channels[cid]
def register_server(self, psm, server):
self.servers[psm] = server
@@ -855,6 +879,8 @@ class ChannelManager:
control_frame = L2CAP_Control_Frame.from_bytes(pdu)
self.on_control_frame(connection, cid, control_frame)
elif cid in self.fixed_channels:
self.fixed_channels[cid](connection.handle, pdu)
else:
if (channel := self.find_channel(connection.handle, cid)) is None:
logger.warn(color(f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'))
@@ -999,13 +1025,13 @@ class ChannelManager:
def on_l2cap_information_request(self, connection, cid, request):
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
result = L2CAP_Information_Response.SUCCESS
data = struct.pack('<H', 1024) # TODO: don't use a fixed value
data = self.connectionless_mtu.to_bytes(2, 'little')
elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('00000000') # TODO: don't use a fixed value
data = sum(self.extended_features).to_bytes(4, 'little')
elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('FFFFFFFFFFFFFFFF') # TODO: don't use a fixed value
data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
else:
result = L2CAP_Information_Request.NO_SUPPORTED

View File

@@ -17,9 +17,10 @@
# -----------------------------------------------------------------------------
import logging
import asyncio
from colors import color
from .utils import EventEmitter
from colors import color
from pyee import EventEmitter
from .core import InvalidStateError, ProtocolError, ConnectionError
# -----------------------------------------------------------------------------

View File

@@ -7,10 +7,12 @@ The Console app is an interactive text user interface that offers a number of fu
* scanning
* advertising
* connecting to devices
* connecting to and disconnecting from devices
* changing connection parameters
* enabling encryption
* discovering GATT services and characteristics
* read & write GATT characteristics
* reading and writing GATT characteristics
* subscribing to and unsubscribing from GATT characteristics
The console user interface has 3 main panes:

View File

@@ -57,12 +57,13 @@ console_scripts =
bumble-link-relay = bumble.apps.link_relay.link_relay:main
[options.extras_require]
build =
build >= 0.7
test =
pytest >= 6.2
pytest-asyncio >= 0.17
development =
invoke >= 1.4
build >= 0.7
nox >= 2022
documentation =
mkdocs >= 1.2.3

View File

@@ -23,35 +23,52 @@ ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
ns = Collection()
# Building
build_tasks = Collection()
ns.add_collection(build_tasks, name='build')
ns.add_collection(build_tasks, name="build")
@task
def build(ctx):
ctx.run('python -m build')
def build(ctx, install=False):
if install:
ctx.run('python -m pip install .[build]')
build_tasks.add_task(build, default=True, name='build')
ctx.run("python -m build")
build_tasks.add_task(build, default=True)
@task
def release_build(ctx):
build(ctx, install=True)
build_tasks.add_task(release_build, name="release")
@task
def mkdocs(ctx):
ctx.run("mkdocs build -f docs/mkdocs/mkdocs.yml")
build_tasks.add_task(mkdocs, name="mkdocs")
# Testing
test_tasks = Collection()
ns.add_collection(test_tasks, name='test')
ns.add_collection(test_tasks, name="test")
@task
def test(ctx, filter=None, junit=False):
def test(ctx, filter=None, junit=False, install=False):
# Install the package before running the tests
if install:
ctx.run("python -m pip install .[test]")
args = ""
if junit:
args += "--junit-xml test-results.xml"
if filter is not None:
args += " -k '{}'".format(filter)
ctx.run('python -m pytest {} {}'
.format(os.path.join(ROOT_DIR, "tests"), args))
test_tasks.add_task(test, name='test', default=True)
ctx.run("python -m pytest {} {}".format(os.path.join(ROOT_DIR, "tests"), args))
test_tasks.add_task(test, default=True)
@task
def mkdocs(ctx):
ctx.run('mkdocs build -f docs/mkdocs/mkdocs.yml')
def release_test(ctx):
test(ctx, install=True)
ns.add_task(mkdocs)
test_tasks.add_task(release_test, name="release")