forked from auracaster/bumble_mirror
pandora: import bumble pandora server from avatar
This commit is contained in:
@@ -0,0 +1,112 @@
|
||||
# Copyright 2022 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import grpc
|
||||
import inspect
|
||||
import logging
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from google.protobuf.message import Message # pytype: disable=pyi-error
|
||||
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
|
||||
|
||||
ADDRESS_TYPES: Dict[str, int] = {
|
||||
"public": Address.PUBLIC_DEVICE_ADDRESS,
|
||||
"random": Address.RANDOM_DEVICE_ADDRESS,
|
||||
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,
|
||||
"random_static_identity": Address.RANDOM_IDENTITY_ADDRESS,
|
||||
}
|
||||
|
||||
|
||||
def address_from_request(request: Message, field: Optional[str]) -> Address:
|
||||
if field is None:
|
||||
return Address.ANY
|
||||
return Address(bytes(reversed(getattr(request, field))), ADDRESS_TYPES[field])
|
||||
|
||||
|
||||
class BumbleServerLoggerAdapter(logging.LoggerAdapter): # type: ignore
|
||||
"""Formats logs from the PandoraClient."""
|
||||
|
||||
def process(
|
||||
self, msg: str, kwargs: MutableMapping[str, Any]
|
||||
) -> Tuple[str, MutableMapping[str, Any]]:
|
||||
assert self.extra
|
||||
service_name = self.extra['service_name']
|
||||
assert isinstance(service_name, str)
|
||||
device = self.extra['device']
|
||||
assert isinstance(device, Device)
|
||||
addr_bytes = bytes(
|
||||
reversed(bytes(device.public_address))
|
||||
) # pytype: disable=attribute-error
|
||||
addr = ':'.join([f'{x:02X}' for x in addr_bytes[4:]])
|
||||
return (f'[bumble.{service_name}:{addr}] {msg}', kwargs)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def exception_to_rpc_error(
|
||||
context: grpc.ServicerContext,
|
||||
) -> Generator[None, None, None]:
|
||||
try:
|
||||
yield None
|
||||
except NotImplementedError as e:
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
|
||||
context.set_details(str(e)) # type: ignore
|
||||
except ValueError as e:
|
||||
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) # type: ignore
|
||||
context.set_details(str(e)) # type: ignore
|
||||
except RuntimeError as e:
|
||||
context.set_code(grpc.StatusCode.ABORTED) # type: ignore
|
||||
context.set_details(str(e)) # type: ignore
|
||||
|
||||
|
||||
# Decorate an RPC servicer method with a wrapper that transform exceptions to gRPC errors.
|
||||
def rpc(func: Any) -> Any:
|
||||
@functools.wraps(func)
|
||||
async def asyncgen_wrapper(
|
||||
self: Any, request: Any, context: grpc.ServicerContext
|
||||
) -> Any:
|
||||
with exception_to_rpc_error(context):
|
||||
async for v in func(self, request, context):
|
||||
yield v
|
||||
|
||||
@functools.wraps(func)
|
||||
async def async_wrapper(
|
||||
self: Any, request: Any, context: grpc.ServicerContext
|
||||
) -> Any:
|
||||
with exception_to_rpc_error(context):
|
||||
return await func(self, request, context)
|
||||
|
||||
@functools.wraps(func)
|
||||
def gen_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any:
|
||||
with exception_to_rpc_error(context):
|
||||
for v in func(self, request, context):
|
||||
yield v
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any:
|
||||
with exception_to_rpc_error(context):
|
||||
return func(self, request, context)
|
||||
|
||||
if inspect.isasyncgenfunction(func):
|
||||
return asyncgen_wrapper
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
|
||||
if inspect.isgenerator(func):
|
||||
return gen_wrapper
|
||||
|
||||
return wrapper
|
||||
Reference in New Issue
Block a user