Add python async wrapper, move hci non-wrapper to internal, add hci::internal tests

This commit is contained in:
Gabriel White-Vega
2023-09-28 15:55:23 -04:00
parent 7e331c2944
commit 511ab4b630
11 changed files with 380 additions and 214 deletions

View File

@@ -194,21 +194,6 @@ class Controller:
self.terminated = asyncio.get_running_loop().create_future()
@classmethod
async def create(
cls,
name,
host_source=None,
host_sink: Optional[TransportSink] = None,
link=None,
public_address: Optional[Union[bytes, str, Address]] = None,
):
'''
Rust's pyo3_asyncio needs the constructor to be async in order to properly
inject a running loop for creating the `terminated` future.
'''
return Controller(name, host_source, host_sink, link, public_address)
@property
def host(self):
return self.hci_sink

View File

@@ -33,7 +33,7 @@ from typing import (
Union,
overload,
)
from functools import wraps
from functools import wraps, partial
from pyee import EventEmitter
from .colors import color
@@ -410,3 +410,20 @@ class FlowControlAsyncPipe:
self.resume_source()
self.check_pump()
async def async_call(function, *args, **kwargs):
"""
Immediately calls the function with provided args and kwargs, wrapping it in an async function.
Rust's `pyo3_asyncio` library needs functions to be marked async to properly inject a running loop.
result = await async_call(some_function, ...)
"""
return function(*args, **kwargs)
def wrap_async(function):
"""
Wraps the provided function in an async function.
"""
return partial(async_call, function)