// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! HCI pub use crate::internal::hci::{packets, Error, Packet}; use crate::{ internal::hci::WithPacketType, wrapper::hci::packets::{AddressType, Command, ErrorCode}, }; use itertools::Itertools as _; use pyo3::{ exceptions::PyException, intern, pyclass, pymethods, types::{PyBytes, PyModule}, FromPyObject, IntoPy, PyAny, PyErr, PyObject, PyResult, Python, ToPyObject, }; /// Provides helpers for interacting with HCI pub struct HciConstant; impl HciConstant { /// Human-readable error name pub fn error_name(status: ErrorCode) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.hci"))? .getattr(intern!(py, "HCI_Constant"))? .call_method1(intern!(py, "error_name"), (status.to_object(py),))? .extract() }) } } /// A Bluetooth address #[derive(Clone)] pub struct Address(pub(crate) PyObject); impl Address { /// Creates a new [Address] object pub fn new(address: &str, address_type: &AddressType) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.device"))? .getattr(intern!(py, "Address"))? .call1((address, address_type.to_object(py))) .map(|any| Self(any.into())) }) } /// The type of address pub fn address_type(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "address_type"))? .extract::(py)? .try_into() .map_err(|addr_type| { PyErr::new::(format!( "Failed to convert {addr_type} to AddressType" )) }) }) } /// True if the address is static pub fn is_static(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "is_static"))? .extract::(py) }) } /// True if the address is resolvable pub fn is_resolvable(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "is_resolvable"))? .extract::(py) }) } /// Address bytes in _little-endian_ format pub fn as_le_bytes(&self) -> PyResult> { Python::with_gil(|py| { self.0 .call_method0(py, intern!(py, "to_bytes"))? .extract::>(py) }) } /// Address bytes as big-endian colon-separated hex pub fn as_hex(&self) -> PyResult { self.as_le_bytes().map(|bytes| { bytes .into_iter() .rev() .map(|byte| hex::encode_upper([byte])) .join(":") }) } } impl ToPyObject for Address { fn to_object(&self, _py: Python<'_>) -> PyObject { self.0.clone() } } /// Implements minimum necessary interface to be treated as bumble's [HCI_Command]. /// While pyo3's macros do not support generics, this could probably be refactored to allow multiple /// implementations of the HCI_Command methods in the future, if needed. #[pyclass] pub(crate) struct HciCommandWrapper(pub(crate) Command); #[pymethods] impl HciCommandWrapper { fn __bytes__(&self, py: Python) -> PyResult { let bytes = PyBytes::new(py, &self.0.clone().to_vec_with_packet_type()); Ok(bytes.into_py(py)) } #[getter] fn op_code(&self) -> u16 { self.0.get_op_code().into() } } impl ToPyObject for AddressType { fn to_object(&self, py: Python<'_>) -> PyObject { u8::from(self).to_object(py) } } impl<'source> FromPyObject<'source> for ErrorCode { fn extract(ob: &'source PyAny) -> PyResult { ob.extract() } } impl ToPyObject for ErrorCode { fn to_object(&self, py: Python<'_>) -> PyObject { u8::from(self).to_object(py) } }