mirror of
https://github.com/google/bumble.git
synced 2026-04-17 00:35:31 +00:00
114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
# Copyright 2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import annotations
|
|
import contextlib
|
|
import functools
|
|
import grpc
|
|
import inspect
|
|
import logging
|
|
|
|
from bumble.device import Device
|
|
from bumble.hci import Address
|
|
from google.protobuf.message import Message # pytype: disable=pyi-error
|
|
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
|
|
|
|
ADDRESS_TYPES: Dict[str, int] = {
|
|
"public": Address.PUBLIC_DEVICE_ADDRESS,
|
|
"random": Address.RANDOM_DEVICE_ADDRESS,
|
|
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,
|
|
"random_static_identity": Address.RANDOM_IDENTITY_ADDRESS,
|
|
}
|
|
|
|
|
|
def address_from_request(request: Message, field: Optional[str]) -> Address:
|
|
if field is None:
|
|
return Address.ANY
|
|
return Address(bytes(reversed(getattr(request, field))), ADDRESS_TYPES[field])
|
|
|
|
|
|
class BumbleServerLoggerAdapter(logging.LoggerAdapter): # type: ignore
|
|
"""Formats logs from the PandoraClient."""
|
|
|
|
def process(
|
|
self, msg: str, kwargs: MutableMapping[str, Any]
|
|
) -> Tuple[str, MutableMapping[str, Any]]:
|
|
assert self.extra
|
|
service_name = self.extra['service_name']
|
|
assert isinstance(service_name, str)
|
|
device = self.extra['device']
|
|
assert isinstance(device, Device)
|
|
addr_bytes = bytes(
|
|
reversed(bytes(device.public_address))
|
|
) # pytype: disable=attribute-error
|
|
addr = ':'.join([f'{x:02X}' for x in addr_bytes[4:]])
|
|
return (f'[bumble.{service_name}:{addr}] {msg}', kwargs)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def exception_to_rpc_error(
|
|
context: grpc.ServicerContext,
|
|
) -> Generator[None, None, None]:
|
|
try:
|
|
yield None
|
|
except NotImplementedError as e:
|
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
|
|
context.set_details(str(e)) # type: ignore
|
|
except ValueError as e:
|
|
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) # type: ignore
|
|
context.set_details(str(e)) # type: ignore
|
|
except RuntimeError as e:
|
|
context.set_code(grpc.StatusCode.ABORTED) # type: ignore
|
|
context.set_details(str(e)) # type: ignore
|
|
|
|
|
|
# Decorate an RPC servicer method with a wrapper that transform exceptions to gRPC errors.
|
|
def rpc(func: Any) -> Any:
|
|
@functools.wraps(func)
|
|
async def asyncgen_wrapper(
|
|
self: Any, request: Any, context: grpc.ServicerContext
|
|
) -> Any:
|
|
with exception_to_rpc_error(context):
|
|
async for v in func(self, request, context):
|
|
yield v
|
|
|
|
@functools.wraps(func)
|
|
async def async_wrapper(
|
|
self: Any, request: Any, context: grpc.ServicerContext
|
|
) -> Any:
|
|
with exception_to_rpc_error(context):
|
|
return await func(self, request, context)
|
|
|
|
@functools.wraps(func)
|
|
def gen_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any:
|
|
with exception_to_rpc_error(context):
|
|
for v in func(self, request, context):
|
|
yield v
|
|
|
|
@functools.wraps(func)
|
|
def wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any:
|
|
with exception_to_rpc_error(context):
|
|
return func(self, request, context)
|
|
|
|
if inspect.isasyncgenfunction(func):
|
|
return asyncgen_wrapper
|
|
|
|
if inspect.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
|
|
if inspect.isgenerator(func):
|
|
return gen_wrapper
|
|
|
|
return wrapper
|