forked from auracaster/bumble_mirror
Further adventures in porting tools to Rust to flesh out the supported API. These tools didn't feel like `example`s, so I made a top level `bumble` CLI tool that hosts them all as subcommands. I also moved the usb probe not-really-an-`example` into it as well. I'm open to suggestions on how best to organize the subcommands to make them intuitive to explore with `--help`, and how to leave room for other future tools. I also adopted the per-OS project data dir for a default firmware location so that users can download once and then use those .bin files from anywhere without having to sprinkle .bin files in project directories or reaching inside the python package dir hierarchy.
142 lines
4.9 KiB
Rust
142 lines
4.9 KiB
Rust
// Copyright 2023 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//! Drivers for Realtek controllers
|
|
|
|
use crate::wrapper::{host::Host, PyObjectExt};
|
|
use pyo3::{intern, types::PyModule, PyObject, PyResult, Python, ToPyObject};
|
|
use pyo3_asyncio::tokio::into_future;
|
|
|
|
pub use crate::internal::drivers::rtk::{Firmware, Patch};
|
|
|
|
/// Driver for a Realtek controller
|
|
pub struct Driver(PyObject);
|
|
|
|
impl Driver {
|
|
/// Locate the driver for the provided host.
|
|
pub async fn for_host(host: &Host, force: bool) -> PyResult<Option<Self>> {
|
|
Python::with_gil(|py| {
|
|
PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
|
|
.getattr(intern!(py, "Driver"))?
|
|
.call_method1(intern!(py, "for_host"), (&host.obj, force))
|
|
.and_then(into_future)
|
|
})?
|
|
.await
|
|
.map(|obj| obj.into_option(Self))
|
|
}
|
|
|
|
/// Check if the host has a known driver.
|
|
pub async fn check(host: &Host) -> PyResult<bool> {
|
|
Python::with_gil(|py| {
|
|
PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
|
|
.getattr(intern!(py, "Driver"))?
|
|
.call_method1(intern!(py, "check"), (&host.obj,))
|
|
.and_then(|obj| obj.extract::<bool>())
|
|
})
|
|
}
|
|
|
|
/// Find the [DriverInfo] for the host, if one matches
|
|
pub async fn driver_info_for_host(host: &Host) -> PyResult<Option<DriverInfo>> {
|
|
Python::with_gil(|py| {
|
|
PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
|
|
.getattr(intern!(py, "Driver"))?
|
|
.call_method1(intern!(py, "driver_info_for_host"), (&host.obj,))
|
|
.and_then(into_future)
|
|
})?
|
|
.await
|
|
.map(|obj| obj.into_option(DriverInfo))
|
|
}
|
|
|
|
/// Send a command to the device to drop firmware
|
|
pub async fn drop_firmware(host: &mut Host) -> PyResult<()> {
|
|
Python::with_gil(|py| {
|
|
PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
|
|
.getattr(intern!(py, "Driver"))?
|
|
.call_method1(intern!(py, "drop_firmware"), (&host.obj,))
|
|
.and_then(into_future)
|
|
})?
|
|
.await
|
|
.map(|_| ())
|
|
}
|
|
|
|
/// Load firmware onto the device.
|
|
pub async fn download_firmware(&mut self) -> PyResult<()> {
|
|
Python::with_gil(|py| {
|
|
self.0
|
|
.call_method0(py, intern!(py, "download_firmware"))
|
|
.and_then(|coroutine| into_future(coroutine.as_ref(py)))
|
|
})?
|
|
.await
|
|
.map(|_| ())
|
|
}
|
|
}
|
|
|
|
/// Metadata about a known driver & applicable device
|
|
pub struct DriverInfo(PyObject);
|
|
|
|
impl DriverInfo {
|
|
/// Returns a list of all drivers that Bumble knows how to handle.
|
|
pub fn all_drivers() -> PyResult<Vec<DriverInfo>> {
|
|
Python::with_gil(|py| {
|
|
PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
|
|
.getattr(intern!(py, "Driver"))?
|
|
.getattr(intern!(py, "DRIVER_INFOS"))?
|
|
.iter()?
|
|
.map(|r| r.map(|h| DriverInfo(h.to_object(py))))
|
|
.collect::<PyResult<Vec<_>>>()
|
|
})
|
|
}
|
|
|
|
/// The firmware file name to load from the filesystem, e.g. `foo_fw.bin`.
|
|
pub fn firmware_name(&self) -> PyResult<String> {
|
|
Python::with_gil(|py| {
|
|
self.0
|
|
.getattr(py, intern!(py, "fw_name"))?
|
|
.as_ref(py)
|
|
.extract::<String>()
|
|
})
|
|
}
|
|
|
|
/// The config file name, if any, to load from the filesystem, e.g. `foo_config.bin`.
|
|
pub fn config_name(&self) -> PyResult<Option<String>> {
|
|
Python::with_gil(|py| {
|
|
let obj = self.0.getattr(py, intern!(py, "config_name"))?;
|
|
let handle = obj.as_ref(py);
|
|
|
|
if handle.is_none() {
|
|
Ok(None)
|
|
} else {
|
|
handle
|
|
.extract::<String>()
|
|
.map(|s| if s.is_empty() { None } else { Some(s) })
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Whether or not config is required.
|
|
pub fn config_needed(&self) -> PyResult<bool> {
|
|
Python::with_gil(|py| {
|
|
self.0
|
|
.getattr(py, intern!(py, "config_needed"))?
|
|
.as_ref(py)
|
|
.extract::<bool>()
|
|
})
|
|
}
|
|
|
|
/// ROM id
|
|
pub fn rom(&self) -> PyResult<u32> {
|
|
Python::with_gil(|py| self.0.getattr(py, intern!(py, "rom"))?.as_ref(py).extract())
|
|
}
|
|
}
|