// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! HCI use itertools::Itertools as _; use pyo3::{ exceptions::PyException, intern, types::PyModule, FromPyObject, PyAny, PyErr, PyObject, PyResult, Python, ToPyObject, }; /// HCI error code. pub struct HciErrorCode(u8); impl<'source> FromPyObject<'source> for HciErrorCode { fn extract(ob: &'source PyAny) -> PyResult { Ok(HciErrorCode(ob.extract()?)) } } impl ToPyObject for HciErrorCode { fn to_object(&self, py: Python<'_>) -> PyObject { self.0.to_object(py) } } /// Provides helpers for interacting with HCI pub struct HciConstant; impl HciConstant { /// Human-readable error name pub fn error_name(status: HciErrorCode) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.hci"))? .getattr(intern!(py, "HCI_Constant"))? .call_method1(intern!(py, "error_name"), (status.0,))? .extract() }) } } /// A Bluetooth address pub struct Address(pub(crate) PyObject); impl Address { /// The type of address pub fn address_type(&self) -> PyResult { Python::with_gil(|py| { let addr_type = self .0 .getattr(py, intern!(py, "address_type"))? .extract::(py)?; let module = PyModule::import(py, intern!(py, "bumble.hci"))?; let klass = module.getattr(intern!(py, "Address"))?; if addr_type == klass .getattr(intern!(py, "PUBLIC_DEVICE_ADDRESS"))? .extract::()? { Ok(AddressType::PublicDevice) } else if addr_type == klass .getattr(intern!(py, "RANDOM_DEVICE_ADDRESS"))? .extract::()? { Ok(AddressType::RandomDevice) } else if addr_type == klass .getattr(intern!(py, "PUBLIC_IDENTITY_ADDRESS"))? .extract::()? { Ok(AddressType::PublicIdentity) } else if addr_type == klass .getattr(intern!(py, "RANDOM_IDENTITY_ADDRESS"))? .extract::()? { Ok(AddressType::RandomIdentity) } else { Err(PyErr::new::("Invalid address type")) } }) } /// True if the address is static pub fn is_static(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "is_static"))? .extract::(py) }) } /// True if the address is resolvable pub fn is_resolvable(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "is_resolvable"))? .extract::(py) }) } /// Address bytes in _little-endian_ format pub fn as_le_bytes(&self) -> PyResult> { Python::with_gil(|py| { self.0 .call_method0(py, intern!(py, "to_bytes"))? .extract::>(py) }) } /// Address bytes as big-endian colon-separated hex pub fn as_hex(&self) -> PyResult { self.as_le_bytes().map(|bytes| { bytes .into_iter() .rev() .map(|byte| hex::encode_upper([byte])) .join(":") }) } } /// BT address types #[allow(missing_docs)] #[derive(PartialEq, Eq, Debug)] pub enum AddressType { PublicDevice, RandomDevice, PublicIdentity, RandomIdentity, }