diff --git a/rust/src/cli/l2cap/client_bridge.rs b/rust/src/cli/l2cap/client_bridge.rs index 37606fc5..31bc0213 100644 --- a/rust/src/cli/l2cap/client_bridge.rs +++ b/rust/src/cli/l2cap/client_bridge.rs @@ -21,8 +21,7 @@ /// TCP client to connect. /// When the L2CAP CoC channel is closed, the TCP connection is closed as well. use crate::cli::l2cap::{ - proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, - BridgeData, + inject_py_event_loop, proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, BridgeData, }; use bumble::wrapper::{ device::{Connection, Device}, @@ -85,11 +84,12 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { let mtu = args.mtu; let mps = args.mps; let ble_connection = Arc::new(Mutex::new(ble_connection)); - // Ensure Python event loop is available to l2cap `disconnect` - let _ = run_future_with_current_task_locals(async move { + // spawn thread to handle incoming tcp connections + tokio::spawn(inject_py_event_loop(async move { while let Ok((tcp_stream, addr)) = listener.accept().await { let ble_connection = ble_connection.clone(); - let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap( + // spawn thread to handle this specific tcp connection + if let Ok(future) = inject_py_event_loop(proxy_data_between_tcp_and_l2cap( ble_connection, tcp_stream, addr, @@ -97,10 +97,11 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { max_credits, mtu, mps, - )); + )) { + tokio::spawn(future); + } } - Ok(()) - }); + })?); Ok(()) } diff --git a/rust/src/cli/l2cap/mod.rs b/rust/src/cli/l2cap/mod.rs index 31097edb..3f86b24d 100644 --- a/rust/src/cli/l2cap/mod.rs +++ b/rust/src/cli/l2cap/mod.rs @@ -18,7 +18,7 @@ use crate::L2cap; use anyhow::anyhow; use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport}; use owo_colors::{colors::css::Orange, OwoColorize}; -use pyo3::{PyObject, PyResult, Python}; +use pyo3::{PyResult, Python}; use std::{future::Future, path::PathBuf, sync::Arc}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -170,21 +170,12 @@ async fn proxy_tcp_rx_to_l2cap_tx( } } -/// Copies the current thread's TaskLocals into a Python "awaitable" and encapsulates it in a Rust -/// future, running it as a Python Task. -/// `TaskLocals` stores the current event loop, and allows the user to copy the current Python -/// context if necessary. In this case, the python event loop is used when calling `disconnect` on -/// an l2cap connection, or else the call will fail. -pub fn run_future_with_current_task_locals( - fut: F, -) -> PyResult> + Send> +/// Copies the current thread's Python even loop (contained in `TaskLocals`) into the given future. +/// Useful when sending work to another thread that calls Python code which calls `get_running_loop()`. +pub fn inject_py_event_loop(fut: F) -> PyResult> where - F: Future> + Send + 'static, + F: Future + Send + 'static, { - Python::with_gil(|py| { - let locals = pyo3_asyncio::tokio::get_current_locals(py)?; - let future = pyo3_asyncio::tokio::scope(locals.clone(), fut); - pyo3_asyncio::tokio::future_into_py_with_locals(py, locals, future) - .and_then(pyo3_asyncio::tokio::into_future) - }) + let locals = Python::with_gil(pyo3_asyncio::tokio::get_current_locals)?; + Ok(pyo3_asyncio::tokio::scope(locals, fut)) } diff --git a/rust/src/cli/l2cap/server_bridge.rs b/rust/src/cli/l2cap/server_bridge.rs index 3a32db92..3f8041a4 100644 --- a/rust/src/cli/l2cap/server_bridge.rs +++ b/rust/src/cli/l2cap/server_bridge.rs @@ -19,10 +19,7 @@ /// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket /// and waits for a new L2CAP CoC channel to be connected. /// When the TCP connection is closed by the TCP server, the L2CAP connection is closed as well. -use crate::cli::l2cap::{ - proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, - BridgeData, -}; +use crate::cli::l2cap::{proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, BridgeData}; use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel}; use futures::executor::block_on; use owo_colors::OwoColorize; @@ -49,19 +46,19 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { let port = args.tcp_port; device.register_l2cap_channel_server( args.psm, - move |_py, l2cap_channel| { + move |py, l2cap_channel| { let channel_info = l2cap_channel .debug_string() .unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})")); println!("{} {channel_info}", "*** L2CAP channel:".cyan()); let host = host.clone(); - // Ensure Python event loop is available to l2cap `disconnect` - let _ = run_future_with_current_task_locals(proxy_data_between_l2cap_and_tcp( - l2cap_channel, - host, - port, - )); + // Handles setting up a tokio runtime that runs this future to completion while also + // containing the necessary context vars. + pyo3_asyncio::tokio::future_into_py( + py, + proxy_data_between_l2cap_and_tcp(l2cap_channel, host, port), + )?; Ok(()) }, args.max_credits,