mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
Remove unncecesary steps for injecting Python event loop
* Context vars can be injected directly into Rust future and spawned with tokio
This commit is contained in:
@@ -21,8 +21,7 @@
|
||||
/// TCP client to connect.
|
||||
/// When the L2CAP CoC channel is closed, the TCP connection is closed as well.
|
||||
use crate::cli::l2cap::{
|
||||
proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals,
|
||||
BridgeData,
|
||||
inject_py_event_loop, proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, BridgeData,
|
||||
};
|
||||
use bumble::wrapper::{
|
||||
device::{Connection, Device},
|
||||
@@ -85,11 +84,12 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
|
||||
let mtu = args.mtu;
|
||||
let mps = args.mps;
|
||||
let ble_connection = Arc::new(Mutex::new(ble_connection));
|
||||
// Ensure Python event loop is available to l2cap `disconnect`
|
||||
let _ = run_future_with_current_task_locals(async move {
|
||||
// spawn thread to handle incoming tcp connections
|
||||
tokio::spawn(inject_py_event_loop(async move {
|
||||
while let Ok((tcp_stream, addr)) = listener.accept().await {
|
||||
let ble_connection = ble_connection.clone();
|
||||
let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap(
|
||||
// spawn thread to handle this specific tcp connection
|
||||
if let Ok(future) = inject_py_event_loop(proxy_data_between_tcp_and_l2cap(
|
||||
ble_connection,
|
||||
tcp_stream,
|
||||
addr,
|
||||
@@ -97,10 +97,11 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
|
||||
max_credits,
|
||||
mtu,
|
||||
mps,
|
||||
));
|
||||
)) {
|
||||
tokio::spawn(future);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
})?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::L2cap;
|
||||
use anyhow::anyhow;
|
||||
use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport};
|
||||
use owo_colors::{colors::css::Orange, OwoColorize};
|
||||
use pyo3::{PyObject, PyResult, Python};
|
||||
use pyo3::{PyResult, Python};
|
||||
use std::{future::Future, path::PathBuf, sync::Arc};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
@@ -170,21 +170,12 @@ async fn proxy_tcp_rx_to_l2cap_tx(
|
||||
}
|
||||
}
|
||||
|
||||
/// Copies the current thread's TaskLocals into a Python "awaitable" and encapsulates it in a Rust
|
||||
/// future, running it as a Python Task.
|
||||
/// `TaskLocals` stores the current event loop, and allows the user to copy the current Python
|
||||
/// context if necessary. In this case, the python event loop is used when calling `disconnect` on
|
||||
/// an l2cap connection, or else the call will fail.
|
||||
pub fn run_future_with_current_task_locals<F>(
|
||||
fut: F,
|
||||
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
|
||||
/// Copies the current thread's Python even loop (contained in `TaskLocals`) into the given future.
|
||||
/// Useful when sending work to another thread that calls Python code which calls `get_running_loop()`.
|
||||
pub fn inject_py_event_loop<F, R>(fut: F) -> PyResult<impl Future<Output = R>>
|
||||
where
|
||||
F: Future<Output = PyResult<()>> + Send + 'static,
|
||||
F: Future<Output = R> + Send + 'static,
|
||||
{
|
||||
Python::with_gil(|py| {
|
||||
let locals = pyo3_asyncio::tokio::get_current_locals(py)?;
|
||||
let future = pyo3_asyncio::tokio::scope(locals.clone(), fut);
|
||||
pyo3_asyncio::tokio::future_into_py_with_locals(py, locals, future)
|
||||
.and_then(pyo3_asyncio::tokio::into_future)
|
||||
})
|
||||
let locals = Python::with_gil(pyo3_asyncio::tokio::get_current_locals)?;
|
||||
Ok(pyo3_asyncio::tokio::scope(locals, fut))
|
||||
}
|
||||
|
||||
@@ -19,10 +19,7 @@
|
||||
/// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket
|
||||
/// and waits for a new L2CAP CoC channel to be connected.
|
||||
/// When the TCP connection is closed by the TCP server, the L2CAP connection is closed as well.
|
||||
use crate::cli::l2cap::{
|
||||
proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals,
|
||||
BridgeData,
|
||||
};
|
||||
use crate::cli::l2cap::{proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, BridgeData};
|
||||
use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel};
|
||||
use futures::executor::block_on;
|
||||
use owo_colors::OwoColorize;
|
||||
@@ -49,19 +46,19 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
|
||||
let port = args.tcp_port;
|
||||
device.register_l2cap_channel_server(
|
||||
args.psm,
|
||||
move |_py, l2cap_channel| {
|
||||
move |py, l2cap_channel| {
|
||||
let channel_info = l2cap_channel
|
||||
.debug_string()
|
||||
.unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})"));
|
||||
println!("{} {channel_info}", "*** L2CAP channel:".cyan());
|
||||
|
||||
let host = host.clone();
|
||||
// Ensure Python event loop is available to l2cap `disconnect`
|
||||
let _ = run_future_with_current_task_locals(proxy_data_between_l2cap_and_tcp(
|
||||
l2cap_channel,
|
||||
host,
|
||||
port,
|
||||
));
|
||||
// Handles setting up a tokio runtime that runs this future to completion while also
|
||||
// containing the necessary context vars.
|
||||
pyo3_asyncio::tokio::future_into_py(
|
||||
py,
|
||||
proxy_data_between_l2cap_and_tcp(l2cap_channel, host, port),
|
||||
)?;
|
||||
Ok(())
|
||||
},
|
||||
args.max_credits,
|
||||
|
||||
Reference in New Issue
Block a user