Merge pull request #185 from google/gbg/netsim-transport

support new android emulator gRPC interface
This commit is contained in:
Gilles Boccon-Gibod
2023-05-03 08:54:55 -07:00
committed by GitHub
46 changed files with 1753 additions and 597 deletions

View File

@@ -145,6 +145,11 @@ async def _open_transport(name: str) -> Transport:
return await open_android_emulator_transport(spec[0] if spec else None)
if scheme == 'android-netsim':
from .android_netsim import open_android_netsim_transport
return await open_android_netsim_transport(spec[0] if spec else None)
raise ValueError('unknown transport scheme')

View File

@@ -16,14 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import logging
import grpc
import grpc.aio
from .common import PumpedTransport, PumpedPacketSource, PumpedPacketSink
from .emulated_bluetooth_pb2_grpc import EmulatedBluetoothServiceStub
from .emulated_bluetooth_vhci_pb2_grpc import VhciForwardingServiceStub
# pylint: disable-next=no-name-in-module
from .emulated_bluetooth_packets_pb2 import HCIPacket
# pylint: disable=no-name-in-module
from .grpc_protobuf.emulated_bluetooth_pb2_grpc import EmulatedBluetoothServiceStub
from .grpc_protobuf.emulated_bluetooth_packets_pb2 import HCIPacket
from .grpc_protobuf.emulated_bluetooth_vhci_pb2_grpc import VhciForwardingServiceStub
# -----------------------------------------------------------------------------

View File

@@ -0,0 +1,410 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import atexit
import logging
import grpc.aio
import os
import pathlib
import sys
from typing import Optional
from .common import (
ParserSource,
PumpedTransport,
PumpedPacketSource,
PumpedPacketSink,
Transport,
)
# pylint: disable=no-name-in-module
from .grpc_protobuf.packet_streamer_pb2_grpc import PacketStreamerStub
from .grpc_protobuf.packet_streamer_pb2_grpc import (
PacketStreamerServicer,
add_PacketStreamerServicer_to_server,
)
from .grpc_protobuf.packet_streamer_pb2 import PacketRequest, PacketResponse
from .grpc_protobuf.hci_packet_pb2 import HCIPacket
from .grpc_protobuf.startup_pb2 import Chip, ChipInfo
from .grpc_protobuf.common_pb2 import ChipKind
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
DEFAULT_NAME = 'bumble0'
DEFAULT_MANUFACTURER = 'Bumble'
# -----------------------------------------------------------------------------
def get_ini_dir() -> Optional[pathlib.Path]:
if sys.platform == 'darwin':
if tmpdir := os.getenv('TMPDIR', None):
return pathlib.Path(tmpdir)
if home := os.getenv('HOME', None):
return pathlib.Path(home) / 'Library/Caches/TemporaryItems'
elif sys.platform == 'linux':
if xdg_runtime_dir := os.environ.get('XDG_RUNTIME_DIR', None):
return pathlib.Path(xdg_runtime_dir)
elif sys.platform == 'win32':
if local_app_data_dir := os.environ.get('LOCALAPPDATA', None):
return pathlib.Path(local_app_data_dir) / 'Temp'
return None
# -----------------------------------------------------------------------------
def find_grpc_port() -> int:
if not (ini_dir := get_ini_dir()):
logger.debug('no known directory for .ini file')
return 0
ini_file = ini_dir / 'netsim.ini'
if ini_file.is_file():
logger.debug(f'Found .ini file at {ini_file}')
with open(ini_file, 'r') as ini_file_data:
for line in ini_file_data.readlines():
if '=' in line:
key, value = line.split('=')
if key == 'grpc.port':
logger.debug(f'gRPC port = {value}')
return int(value)
# Not found
return 0
# -----------------------------------------------------------------------------
def publish_grpc_port(grpc_port) -> bool:
if not (ini_dir := get_ini_dir()):
logger.debug('no known directory for .ini file')
return False
if not ini_dir.is_dir():
logger.debug('ini directory does not exist')
return False
ini_file = ini_dir / 'netsim.ini'
try:
ini_file.write_text(f'grpc.port={grpc_port}\n')
logger.debug(f"published gRPC port at {ini_file}")
def cleanup():
logger.debug("removing .ini file")
ini_file.unlink()
atexit.register(cleanup)
return True
except OSError:
logger.debug('failed to write to .ini file')
return False
# -----------------------------------------------------------------------------
async def open_android_netsim_controller_transport(server_host, server_port):
if not server_port:
raise ValueError('invalid port')
if server_host == '_' or not server_host:
server_host = 'localhost'
if not publish_grpc_port(server_port):
logger.warning("unable to publish gRPC port")
class HciDevice:
def __init__(self, context, on_data_received):
self.context = context
self.on_data_received = on_data_received
self.name = None
self.loop = asyncio.get_running_loop()
self.done = self.loop.create_future()
self.task = self.loop.create_task(self.pump())
async def pump(self):
try:
await self.pump_loop()
except asyncio.CancelledError:
logger.debug('Pump task canceled')
self.done.set_result(None)
async def pump_loop(self):
while True:
request = await self.context.read()
if request == grpc.aio.EOF:
logger.debug('End of request stream')
self.done.set_result(None)
return
# If we're not initialized yet, wait for a init packet.
if self.name is None:
if request.WhichOneof('request_type') == 'initial_info':
logger.debug(f'Received initial info: {request}')
# We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type')
error = PacketResponse(error='Unsupported chip type')
await self.context.write(error)
return
self.name = request.initial_info.name
continue
# Expect a data packet
request_type = request.WhichOneof('request_type')
if request_type != 'hci_packet':
logger.warning(f'Unexpected request type: {request_type}')
error = PacketResponse(error='Unexpected request type')
await self.context.write(error)
continue
# Process the packet
data = (
bytes([request.hci_packet.packet_type]) + request.hci_packet.packet
)
logger.debug(f'<<< PACKET: {data.hex()}')
self.on_data_received(data)
def send_packet(self, data):
async def send():
await self.context.write(
PacketResponse(
hci_packet=HCIPacket(packet_type=data[0], packet=data[1:])
)
)
self.loop.create_task(send())
def terminate(self):
self.task.cancel()
async def wait_for_termination(self):
await self.done
class Server(PacketStreamerServicer, ParserSource):
def __init__(self):
PacketStreamerServicer.__init__(self)
ParserSource.__init__(self)
self.device = None
# Create a gRPC server with `so_reuseport=0` so that if there's already
# a server listening on that port, we get an exception.
self.grpc_server = grpc.aio.server(options=(('grpc.so_reuseport', 0),))
add_PacketStreamerServicer_to_server(self, self.grpc_server)
self.grpc_server.add_insecure_port(f'{server_host}:{server_port}')
logger.debug(f'gRPC server listening on {server_host}:{server_port}')
async def start(self):
logger.debug('Starting gRPC server')
await self.grpc_server.start()
async def serve(self):
# Keep serving until terminated.
try:
await self.grpc_server.wait_for_termination()
logger.debug('gRPC server terminated')
except asyncio.CancelledError:
logger.debug('gRPC server cancelled')
await self.grpc_server.stop(None)
def on_packet(self, packet):
if not self.device:
logger.debug('no device, dropping packet')
return
self.device.send_packet(packet)
async def StreamPackets(self, _request_iterator, context):
logger.debug('StreamPackets request')
# Check that we won't already have a device
if self.device:
logger.debug('busy, already serving a device')
return PacketResponse(error='Busy')
# Instantiate a new device
self.device = HciDevice(context, self.parser.feed_data)
# Wait for the device to terminate
logger.debug('Waiting for device to terminate')
try:
await self.device.wait_for_termination()
except asyncio.CancelledError:
logger.debug('Request canceled')
self.device.terminate()
logger.debug('Device terminated')
self.device = None
server = Server()
await server.start()
asyncio.get_running_loop().create_task(server.serve())
class GrpcServerTransport(Transport):
async def close(self):
await super().close()
return GrpcServerTransport(server, server)
# -----------------------------------------------------------------------------
async def open_android_netsim_host_transport(server_host, server_port, options):
# Wrapper for I/O operations
class HciDevice:
def __init__(self, name, manufacturer, hci_device):
self.name = name
self.manufacturer = manufacturer
self.hci_device = hci_device
async def start(self): # Send the startup info
chip_info = ChipInfo(
name=self.name,
chip=Chip(kind=ChipKind.BLUETOOTH, manufacturer=self.manufacturer),
)
logger.debug(f'Sending chip info to netsim: {chip_info}')
await self.hci_device.write(PacketRequest(initial_info=chip_info))
async def read(self):
response = await self.hci_device.read()
response_type = response.WhichOneof('response_type')
if response_type == 'error':
logger.warning(f'received error: {response.error}')
raise RuntimeError(response.error)
elif response_type == 'hci_packet':
return (
bytes([response.hci_packet.packet_type])
+ response.hci_packet.packet
)
raise ValueError('unsupported response type')
async def write(self, packet):
await self.hci_device.write(
PacketRequest(
hci_packet=HCIPacket(packet_type=packet[0], packet=packet[1:])
)
)
name = options.get('name', DEFAULT_NAME)
manufacturer = DEFAULT_MANUFACTURER
if server_host == '_' or not server_host:
server_host = 'localhost'
if not server_port:
# Look for the gRPC config in a .ini file
server_host = 'localhost'
server_port = find_grpc_port()
if not server_port:
raise RuntimeError('gRPC server port not found')
# Connect to the gRPC server
server_address = f'{server_host}:{server_port}'
logger.debug(f'Connecting to gRPC server at {server_address}')
channel = grpc.aio.insecure_channel(server_address)
# Connect as a host
service = PacketStreamerStub(channel)
hci_device = HciDevice(
name=name,
manufacturer=manufacturer,
hci_device=service.StreamPackets(),
)
await hci_device.start()
# Create the transport object
transport = PumpedTransport(
PumpedPacketSource(hci_device.read),
PumpedPacketSink(hci_device.write),
channel.close,
)
transport.start()
return transport
# -----------------------------------------------------------------------------
async def open_android_netsim_transport(spec):
'''
Open a transport connection as a client or server, implementing Android's `netsim`
simulator protocol over gRPC.
The parameter string has this syntax:
[<host>:<port>][<options>]
Where <options> is a ','-separated list of <name>=<value> pairs.
General options:
mode=host|controller (default: host)
Specifies whether the transport is used
to connect *to* a netsim server (netsim is the controller), or accept
connections *as* a netsim-compatible server.
In `host` mode:
The <host>:<port> part is optional. When not specified, the transport
looks for a netsim .ini file, from which it will read the `grpc.backend.port`
property.
Options for this mode are:
name=<name>
The "chip" name, used to identify the "chip" instance. This
may be useful when several clients are connected, since each needs to use a
different name.
In `controller` mode:
The <host>:<port> part is required. <host> may be the address of a local network
interface, or '_' to accept connections on all local network interfaces.
Examples:
(empty string) --> connect to netsim on the port specified in the .ini file
localhost:8555 --> connect to netsim on localhost:8555
name=bumble1 --> connect to netsim, using `bumble1` as the "chip" name.
localhost:8555,name=bumble1 --> connect to netsim on localhost:8555, using
`bumble1` as the "chip" name.
_:8877,mode=controller --> accept connections as a controller on any interface
on port 8877.
'''
# Parse the parameters
params = spec.split(',') if spec else []
if params and ':' in params[0]:
# Explicit <host>:<port>
host, port = params[0].split(':')
params_offset = 1
else:
host = None
port = 0
params_offset = 0
options = {}
for param in params[params_offset:]:
if '=' not in param:
raise ValueError('invalid parameter, expected <name>=<value>')
option_name, option_value = param.split('=')
options[option_name] = option_value
mode = options.get('mode', 'host')
if mode == 'host':
return await open_android_netsim_host_transport(host, port, options)
if mode == 'controller':
if host is None:
raise ValueError('<host>:<port> missing')
return await open_android_netsim_controller_transport(host, port)
raise ValueError('invalid mode option')

View File

@@ -1,45 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_packets.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n emulated_bluetooth_packets.proto\x12\x1b\x61ndroid.emulation.bluetooth\"\xfb\x01\n\tHCIPacket\x12?\n\x04type\x18\x01 \x01(\x0e\x32\x31.android.emulation.bluetooth.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"\x9c\x01\n\nPacketType\x12\x1b\n\x17PACKET_TYPE_UNSPECIFIED\x10\x00\x12\x1b\n\x17PACKET_TYPE_HCI_COMMAND\x10\x01\x12\x13\n\x0fPACKET_TYPE_ACL\x10\x02\x12\x13\n\x0fPACKET_TYPE_SCO\x10\x03\x12\x15\n\x11PACKET_TYPE_EVENT\x10\x04\x12\x13\n\x0fPACKET_TYPE_ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3'
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'emulated_bluetooth_packets_pb2', globals()
)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_HCIPACKET._serialized_start = 66
_HCIPACKET._serialized_end = 317
_HCIPACKET_PACKETTYPE._serialized_start = 161
_HCIPACKET_PACKETTYPE._serialized_end = 317
# @@protoc_insertion_point(module_scope)

View File

@@ -1,17 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,46 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x18\x65mulated_bluetooth.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto\"\x19\n\x07RawData\x12\x0e\n\x06packet\x18\x01 \x01(\x0c\x32\xcb\x02\n\x18\x45mulatedBluetoothService\x12\x64\n\x12registerClassicPhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12`\n\x0eregisterBlePhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12g\n\x11registerHCIDevice\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42\"\n\x1e\x63om.android.emulator.bluetoothP\x01\x62\x06proto3'
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'emulated_bluetooth_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\036com.android.emulator.bluetoothP\001'
_RAWDATA._serialized_start = 91
_RAWDATA._serialized_end = 116
_EMULATEDBLUETOOTHSERVICE._serialized_start = 119
_EMULATEDBLUETOOTHSERVICE._serialized_end = 450
# @@protoc_insertion_point(module_scope)

View File

@@ -1,26 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional
DESCRIPTOR: _descriptor.FileDescriptor
class RawData(_message.Message):
__slots__ = ["packet"]
PACKET_FIELD_NUMBER: _ClassVar[int]
packet: bytes
def __init__(self, packet: _Optional[bytes] = ...) -> None: ...

View File

@@ -1,244 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
from . import emulated_bluetooth_pb2 as emulated__bluetooth__pb2
class EmulatedBluetoothServiceStub(object):
"""An Emulated Bluetooth Service exposes the emulated bluetooth chip from the
android emulator. It allows you to register emulated bluetooth devices and
control the packets that are exchanged between the device and the world.
This service enables you to establish a "virtual network" of emulated
bluetooth devices that can interact with each other.
Note: This is not yet finalized, it is likely that these definitions will
evolve.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.registerClassicPhy = channel.stream_stream(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerClassicPhy',
request_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
response_deserializer=emulated__bluetooth__pb2.RawData.FromString,
)
self.registerBlePhy = channel.stream_stream(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerBlePhy',
request_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
response_deserializer=emulated__bluetooth__pb2.RawData.FromString,
)
self.registerHCIDevice = channel.stream_stream(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerHCIDevice',
request_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
response_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
)
class EmulatedBluetoothServiceServicer(object):
"""An Emulated Bluetooth Service exposes the emulated bluetooth chip from the
android emulator. It allows you to register emulated bluetooth devices and
control the packets that are exchanged between the device and the world.
This service enables you to establish a "virtual network" of emulated
bluetooth devices that can interact with each other.
Note: This is not yet finalized, it is likely that these definitions will
evolve.
"""
def registerClassicPhy(self, request_iterator, context):
"""Connect device to link layer. This will establish a direct connection
to the emulated bluetooth chip and configure the following:
- Each connection creates a new device and attaches it to the link layer
- Link Layer packets are transmitted directly to the phy
This should be used for classic connections.
This is used to directly connect various android emulators together.
For example a wear device can connect to an android emulator through
this.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def registerBlePhy(self, request_iterator, context):
"""Connect device to link layer. This will establish a direct connection
to root canal and execute the following:
- Each connection creates a new device and attaches it to the link layer
- Link Layer packets are transmitted directly to the phy
This should be used for BLE connections.
This is used to directly connect various android emulators together.
For example a wear device can connect to an android emulator through
this.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def registerHCIDevice(self, request_iterator, context):
"""Connect the device to the emulated bluetooth chip. The device will
participate in the network. You can configure the chip to scan, advertise
and setup connections with other devices that are connected to the
network.
This is usually used when you have a need for an emulated bluetooth chip
and have a bluetooth stack that can interpret and handle the packets
correctly.
For example the apache nimble stack can use this endpoint as the
transport layer.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_EmulatedBluetoothServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'registerClassicPhy': grpc.stream_stream_rpc_method_handler(
servicer.registerClassicPhy,
request_deserializer=emulated__bluetooth__pb2.RawData.FromString,
response_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
),
'registerBlePhy': grpc.stream_stream_rpc_method_handler(
servicer.registerBlePhy,
request_deserializer=emulated__bluetooth__pb2.RawData.FromString,
response_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
),
'registerHCIDevice': grpc.stream_stream_rpc_method_handler(
servicer.registerHCIDevice,
request_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
response_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'android.emulation.bluetooth.EmulatedBluetoothService', rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class EmulatedBluetoothService(object):
"""An Emulated Bluetooth Service exposes the emulated bluetooth chip from the
android emulator. It allows you to register emulated bluetooth devices and
control the packets that are exchanged between the device and the world.
This service enables you to establish a "virtual network" of emulated
bluetooth devices that can interact with each other.
Note: This is not yet finalized, it is likely that these definitions will
evolve.
"""
@staticmethod
def registerClassicPhy(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/android.emulation.bluetooth.EmulatedBluetoothService/registerClassicPhy',
emulated__bluetooth__pb2.RawData.SerializeToString,
emulated__bluetooth__pb2.RawData.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def registerBlePhy(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/android.emulation.bluetooth.EmulatedBluetoothService/registerBlePhy',
emulated__bluetooth__pb2.RawData.SerializeToString,
emulated__bluetooth__pb2.RawData.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def registerHCIDevice(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/android.emulation.bluetooth.EmulatedBluetoothService/registerHCIDevice',
emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
emulated__bluetooth__packets__pb2.HCIPacket.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)

View File

@@ -1,46 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_vhci.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1d\x65mulated_bluetooth_vhci.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto2y\n\x15VhciForwardingService\x12`\n\nattachVhci\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3'
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'emulated_bluetooth_vhci_pb2', globals()
)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_VHCIFORWARDINGSERVICE._serialized_start = 96
_VHCIFORWARDINGSERVICE._serialized_end = 217
# @@protoc_insertion_point(module_scope)

View File

@@ -1,19 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
DESCRIPTOR: _descriptor.FileDescriptor

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: common.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x63ommon.proto\x12\rnetsim.common*=\n\x08\x43hipKind\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\r\n\tBLUETOOTH\x10\x01\x12\x08\n\x04WIFI\x10\x02\x12\x07\n\x03UWB\x10\x03\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'common_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_CHIPKIND._serialized_start=31
_CHIPKIND._serialized_end=92
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,12 @@
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
BLUETOOTH: ChipKind
DESCRIPTOR: _descriptor.FileDescriptor
UNSPECIFIED: ChipKind
UWB: ChipKind
WIFI: ChipKind
class ChipKind(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,158 @@
from . import grpc_endpoint_description_pb2 as _grpc_endpoint_description_pb2
from google.protobuf import empty_pb2 as _empty_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class Advertisement(_message.Message):
__slots__ = ["connection_mode", "device_name", "discovery_mode"]
class ConnectionMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
class DiscoveryMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
CONNECTION_MODE_DIRECTED: Advertisement.ConnectionMode
CONNECTION_MODE_FIELD_NUMBER: _ClassVar[int]
CONNECTION_MODE_NON_CONNECTABLE: Advertisement.ConnectionMode
CONNECTION_MODE_UNDIRECTED: Advertisement.ConnectionMode
CONNECTION_MODE_UNSPECIFIED: Advertisement.ConnectionMode
DEVICE_NAME_FIELD_NUMBER: _ClassVar[int]
DISCOVERY_MODE_FIELD_NUMBER: _ClassVar[int]
DISCOVERY_MODE_GENERAL: Advertisement.DiscoveryMode
DISCOVERY_MODE_LIMITED: Advertisement.DiscoveryMode
DISCOVERY_MODE_NON_DISCOVERABLE: Advertisement.DiscoveryMode
DISCOVERY_MODE_UNSPECIFIED: Advertisement.DiscoveryMode
connection_mode: Advertisement.ConnectionMode
device_name: str
discovery_mode: Advertisement.DiscoveryMode
def __init__(self, device_name: _Optional[str] = ..., connection_mode: _Optional[_Union[Advertisement.ConnectionMode, str]] = ..., discovery_mode: _Optional[_Union[Advertisement.DiscoveryMode, str]] = ...) -> None: ...
class CallbackIdentifier(_message.Message):
__slots__ = ["identity"]
IDENTITY_FIELD_NUMBER: _ClassVar[int]
identity: str
def __init__(self, identity: _Optional[str] = ...) -> None: ...
class CharacteristicValueRequest(_message.Message):
__slots__ = ["callback_device_id", "callback_id", "data", "from_device"]
CALLBACK_DEVICE_ID_FIELD_NUMBER: _ClassVar[int]
CALLBACK_ID_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
FROM_DEVICE_FIELD_NUMBER: _ClassVar[int]
callback_device_id: CallbackIdentifier
callback_id: Uuid
data: bytes
from_device: DeviceIdentifier
def __init__(self, callback_device_id: _Optional[_Union[CallbackIdentifier, _Mapping]] = ..., from_device: _Optional[_Union[DeviceIdentifier, _Mapping]] = ..., callback_id: _Optional[_Union[Uuid, _Mapping]] = ..., data: _Optional[bytes] = ...) -> None: ...
class CharacteristicValueResponse(_message.Message):
__slots__ = ["data", "status"]
class GattStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
DATA_FIELD_NUMBER: _ClassVar[int]
GATT_STATUS_FAILURE: CharacteristicValueResponse.GattStatus
GATT_STATUS_SUCCESS: CharacteristicValueResponse.GattStatus
GATT_STATUS_UNSPECIFIED: CharacteristicValueResponse.GattStatus
STATUS_FIELD_NUMBER: _ClassVar[int]
data: bytes
status: CharacteristicValueResponse.GattStatus
def __init__(self, status: _Optional[_Union[CharacteristicValueResponse.GattStatus, str]] = ..., data: _Optional[bytes] = ...) -> None: ...
class ConnectionStateChange(_message.Message):
__slots__ = ["callback_device_id", "from_device", "new_state"]
class ConnectionState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
CALLBACK_DEVICE_ID_FIELD_NUMBER: _ClassVar[int]
CONNECTION_STATE_CONNECTED: ConnectionStateChange.ConnectionState
CONNECTION_STATE_DISCONNECTED: ConnectionStateChange.ConnectionState
CONNECTION_STATE_UNDEFINED: ConnectionStateChange.ConnectionState
FROM_DEVICE_FIELD_NUMBER: _ClassVar[int]
NEW_STATE_FIELD_NUMBER: _ClassVar[int]
callback_device_id: CallbackIdentifier
from_device: DeviceIdentifier
new_state: ConnectionStateChange.ConnectionState
def __init__(self, callback_device_id: _Optional[_Union[CallbackIdentifier, _Mapping]] = ..., from_device: _Optional[_Union[DeviceIdentifier, _Mapping]] = ..., new_state: _Optional[_Union[ConnectionStateChange.ConnectionState, str]] = ...) -> None: ...
class DeviceIdentifier(_message.Message):
__slots__ = ["address"]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
address: str
def __init__(self, address: _Optional[str] = ...) -> None: ...
class GattCharacteristic(_message.Message):
__slots__ = ["callback_id", "permissions", "properties", "uuid"]
class Permissions(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
class Properties(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
CALLBACK_ID_FIELD_NUMBER: _ClassVar[int]
PERMISSIONS_FIELD_NUMBER: _ClassVar[int]
PERMISSION_READ: GattCharacteristic.Permissions
PERMISSION_READ_ENCRYPTED: GattCharacteristic.Permissions
PERMISSION_READ_ENCRYPTED_MITM: GattCharacteristic.Permissions
PERMISSION_UNSPECIFIED: GattCharacteristic.Permissions
PERMISSION_WRITE: GattCharacteristic.Permissions
PERMISSION_WRITE_ENCRYPTED: GattCharacteristic.Permissions
PERMISSION_WRITE_ENCRYPTED_MITM: GattCharacteristic.Permissions
PERMISSION_WRITE_SIGNED: GattCharacteristic.Permissions
PERMISSION_WRITE_SIGNED_MITM: GattCharacteristic.Permissions
PROPERTIES_FIELD_NUMBER: _ClassVar[int]
PROPERTY_BROADCAST: GattCharacteristic.Properties
PROPERTY_EXTENDED_PROPS: GattCharacteristic.Properties
PROPERTY_INDICATE: GattCharacteristic.Properties
PROPERTY_NOTIFY: GattCharacteristic.Properties
PROPERTY_READ: GattCharacteristic.Properties
PROPERTY_SIGNED_WRITE: GattCharacteristic.Properties
PROPERTY_UNSPECIFIED: GattCharacteristic.Properties
PROPERTY_WRITE: GattCharacteristic.Properties
PROPERTY_WRITE_NO_RESPONSE: GattCharacteristic.Properties
UUID_FIELD_NUMBER: _ClassVar[int]
callback_id: Uuid
permissions: int
properties: int
uuid: Uuid
def __init__(self, uuid: _Optional[_Union[Uuid, _Mapping]] = ..., properties: _Optional[int] = ..., permissions: _Optional[int] = ..., callback_id: _Optional[_Union[Uuid, _Mapping]] = ...) -> None: ...
class GattDevice(_message.Message):
__slots__ = ["advertisement", "endpoint", "profile"]
ADVERTISEMENT_FIELD_NUMBER: _ClassVar[int]
ENDPOINT_FIELD_NUMBER: _ClassVar[int]
PROFILE_FIELD_NUMBER: _ClassVar[int]
advertisement: Advertisement
endpoint: _grpc_endpoint_description_pb2.Endpoint
profile: GattProfile
def __init__(self, endpoint: _Optional[_Union[_grpc_endpoint_description_pb2.Endpoint, _Mapping]] = ..., advertisement: _Optional[_Union[Advertisement, _Mapping]] = ..., profile: _Optional[_Union[GattProfile, _Mapping]] = ...) -> None: ...
class GattProfile(_message.Message):
__slots__ = ["services"]
SERVICES_FIELD_NUMBER: _ClassVar[int]
services: _containers.RepeatedCompositeFieldContainer[GattService]
def __init__(self, services: _Optional[_Iterable[_Union[GattService, _Mapping]]] = ...) -> None: ...
class GattService(_message.Message):
__slots__ = ["characteristics", "service_type", "uuid"]
class ServiceType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
CHARACTERISTICS_FIELD_NUMBER: _ClassVar[int]
SERVICE_TYPE_FIELD_NUMBER: _ClassVar[int]
SERVICE_TYPE_PRIMARY: GattService.ServiceType
SERVICE_TYPE_SECONDARY: GattService.ServiceType
SERVICE_TYPE_UNSPECIFIED: GattService.ServiceType
UUID_FIELD_NUMBER: _ClassVar[int]
characteristics: _containers.RepeatedCompositeFieldContainer[GattCharacteristic]
service_type: GattService.ServiceType
uuid: Uuid
def __init__(self, uuid: _Optional[_Union[Uuid, _Mapping]] = ..., service_type: _Optional[_Union[GattService.ServiceType, str]] = ..., characteristics: _Optional[_Iterable[_Union[GattCharacteristic, _Mapping]]] = ...) -> None: ...
class Uuid(_message.Message):
__slots__ = ["id", "lsb", "msb"]
ID_FIELD_NUMBER: _ClassVar[int]
LSB_FIELD_NUMBER: _ClassVar[int]
MSB_FIELD_NUMBER: _ClassVar[int]
id: int
lsb: int
msb: int
def __init__(self, id: _Optional[int] = ..., lsb: _Optional[int] = ..., msb: _Optional[int] = ...) -> None: ...

View File

@@ -0,0 +1,193 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import emulated_bluetooth_device_pb2 as emulated__bluetooth__device__pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
class GattDeviceServiceStub(object):
"""You can provide your own GattDevice by implementing this service
and registering it with the android emulator.
The device will appear as a real bluetooth device, and you will
receive callbacks when the bluetooth system wants to
read, write or observe a characteristic.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.OnCharacteristicReadRequest = channel.unary_unary(
'/android.emulation.bluetooth.GattDeviceService/OnCharacteristicReadRequest',
request_serializer=emulated__bluetooth__device__pb2.CharacteristicValueRequest.SerializeToString,
response_deserializer=emulated__bluetooth__device__pb2.CharacteristicValueResponse.FromString,
)
self.OnCharacteristicWriteRequest = channel.unary_unary(
'/android.emulation.bluetooth.GattDeviceService/OnCharacteristicWriteRequest',
request_serializer=emulated__bluetooth__device__pb2.CharacteristicValueRequest.SerializeToString,
response_deserializer=emulated__bluetooth__device__pb2.CharacteristicValueResponse.FromString,
)
self.OnCharacteristicObserveRequest = channel.unary_stream(
'/android.emulation.bluetooth.GattDeviceService/OnCharacteristicObserveRequest',
request_serializer=emulated__bluetooth__device__pb2.CharacteristicValueRequest.SerializeToString,
response_deserializer=emulated__bluetooth__device__pb2.CharacteristicValueResponse.FromString,
)
self.OnConnectionStateChange = channel.unary_unary(
'/android.emulation.bluetooth.GattDeviceService/OnConnectionStateChange',
request_serializer=emulated__bluetooth__device__pb2.ConnectionStateChange.SerializeToString,
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
)
class GattDeviceServiceServicer(object):
"""You can provide your own GattDevice by implementing this service
and registering it with the android emulator.
The device will appear as a real bluetooth device, and you will
receive callbacks when the bluetooth system wants to
read, write or observe a characteristic.
"""
def OnCharacteristicReadRequest(self, request, context):
"""A remote client has requested to read a local characteristic.
Return the current observed value.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def OnCharacteristicWriteRequest(self, request, context):
"""A remote client has requested to write to a local characteristic.
Return the current observed value.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def OnCharacteristicObserveRequest(self, request, context):
"""Listens for notifications from the emulated device, the device should
write to the stream with a response when a change has occurred.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def OnConnectionStateChange(self, request, context):
"""A remote device has been connected or disconnected.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_GattDeviceServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'OnCharacteristicReadRequest': grpc.unary_unary_rpc_method_handler(
servicer.OnCharacteristicReadRequest,
request_deserializer=emulated__bluetooth__device__pb2.CharacteristicValueRequest.FromString,
response_serializer=emulated__bluetooth__device__pb2.CharacteristicValueResponse.SerializeToString,
),
'OnCharacteristicWriteRequest': grpc.unary_unary_rpc_method_handler(
servicer.OnCharacteristicWriteRequest,
request_deserializer=emulated__bluetooth__device__pb2.CharacteristicValueRequest.FromString,
response_serializer=emulated__bluetooth__device__pb2.CharacteristicValueResponse.SerializeToString,
),
'OnCharacteristicObserveRequest': grpc.unary_stream_rpc_method_handler(
servicer.OnCharacteristicObserveRequest,
request_deserializer=emulated__bluetooth__device__pb2.CharacteristicValueRequest.FromString,
response_serializer=emulated__bluetooth__device__pb2.CharacteristicValueResponse.SerializeToString,
),
'OnConnectionStateChange': grpc.unary_unary_rpc_method_handler(
servicer.OnConnectionStateChange,
request_deserializer=emulated__bluetooth__device__pb2.ConnectionStateChange.FromString,
response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'android.emulation.bluetooth.GattDeviceService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class GattDeviceService(object):
"""You can provide your own GattDevice by implementing this service
and registering it with the android emulator.
The device will appear as a real bluetooth device, and you will
receive callbacks when the bluetooth system wants to
read, write or observe a characteristic.
"""
@staticmethod
def OnCharacteristicReadRequest(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/android.emulation.bluetooth.GattDeviceService/OnCharacteristicReadRequest',
emulated__bluetooth__device__pb2.CharacteristicValueRequest.SerializeToString,
emulated__bluetooth__device__pb2.CharacteristicValueResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def OnCharacteristicWriteRequest(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/android.emulation.bluetooth.GattDeviceService/OnCharacteristicWriteRequest',
emulated__bluetooth__device__pb2.CharacteristicValueRequest.SerializeToString,
emulated__bluetooth__device__pb2.CharacteristicValueResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def OnCharacteristicObserveRequest(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/android.emulation.bluetooth.GattDeviceService/OnCharacteristicObserveRequest',
emulated__bluetooth__device__pb2.CharacteristicValueRequest.SerializeToString,
emulated__bluetooth__device__pb2.CharacteristicValueResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def OnConnectionStateChange(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/android.emulation.bluetooth.GattDeviceService/OnConnectionStateChange',
emulated__bluetooth__device__pb2.ConnectionStateChange.SerializeToString,
google_dot_protobuf_dot_empty__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_packets.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n emulated_bluetooth_packets.proto\x12\x1b\x61ndroid.emulation.bluetooth\"\xfb\x01\n\tHCIPacket\x12?\n\x04type\x18\x01 \x01(\x0e\x32\x31.android.emulation.bluetooth.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"\x9c\x01\n\nPacketType\x12\x1b\n\x17PACKET_TYPE_UNSPECIFIED\x10\x00\x12\x1b\n\x17PACKET_TYPE_HCI_COMMAND\x10\x01\x12\x13\n\x0fPACKET_TYPE_ACL\x10\x02\x12\x13\n\x0fPACKET_TYPE_SCO\x10\x03\x12\x15\n\x11PACKET_TYPE_EVENT\x10\x04\x12\x13\n\x0fPACKET_TYPE_ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'emulated_bluetooth_packets_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_HCIPACKET._serialized_start=66
_HCIPACKET._serialized_end=317
_HCIPACKET_PACKETTYPE._serialized_start=161
_HCIPACKET_PACKETTYPE._serialized_end=317
# @@protoc_insertion_point(module_scope)

View File

@@ -1,17 +1,3 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
@@ -21,7 +7,6 @@ DESCRIPTOR: _descriptor.FileDescriptor
class HCIPacket(_message.Message):
__slots__ = ["packet", "type"]
class PacketType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
PACKET_FIELD_NUMBER: _ClassVar[int]
@@ -34,8 +19,4 @@ class HCIPacket(_message.Message):
TYPE_FIELD_NUMBER: _ClassVar[int]
packet: bytes
type: HCIPacket.PacketType
def __init__(
self,
type: _Optional[_Union[HCIPacket.PacketType, str]] = ...,
packet: _Optional[bytes] = ...,
) -> None: ...
def __init__(self, type: _Optional[_Union[HCIPacket.PacketType, str]] = ..., packet: _Optional[bytes] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
from . import emulated_bluetooth_device_pb2 as emulated__bluetooth__device__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x65mulated_bluetooth.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto\x1a\x1f\x65mulated_bluetooth_device.proto\"\x19\n\x07RawData\x12\x0e\n\x06packet\x18\x01 \x01(\x0c\"a\n\x12RegistrationStatus\x12K\n\x12\x63\x61llback_device_id\x18\x01 \x01(\x0b\x32/.android.emulation.bluetooth.CallbackIdentifier2\xbb\x03\n\x18\x45mulatedBluetoothService\x12\x64\n\x12registerClassicPhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12`\n\x0eregisterBlePhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12g\n\x11registerHCIDevice\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x12n\n\x12registerGattDevice\x12\'.android.emulation.bluetooth.GattDevice\x1a/.android.emulation.bluetooth.RegistrationStatusB\"\n\x1e\x63om.android.emulator.bluetoothP\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'emulated_bluetooth_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\036com.android.emulator.bluetoothP\001'
_RAWDATA._serialized_start=124
_RAWDATA._serialized_end=149
_REGISTRATIONSTATUS._serialized_start=151
_REGISTRATIONSTATUS._serialized_end=248
_EMULATEDBLUETOOTHSERVICE._serialized_start=251
_EMULATEDBLUETOOTHSERVICE._serialized_end=694
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,19 @@
from . import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from . import emulated_bluetooth_device_pb2 as _emulated_bluetooth_device_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class RawData(_message.Message):
__slots__ = ["packet"]
PACKET_FIELD_NUMBER: _ClassVar[int]
packet: bytes
def __init__(self, packet: _Optional[bytes] = ...) -> None: ...
class RegistrationStatus(_message.Message):
__slots__ = ["callback_device_id"]
CALLBACK_DEVICE_ID_FIELD_NUMBER: _ClassVar[int]
callback_device_id: _emulated_bluetooth_device_pb2.CallbackIdentifier
def __init__(self, callback_device_id: _Optional[_Union[_emulated_bluetooth_device_pb2.CallbackIdentifier, _Mapping]] = ...) -> None: ...

View File

@@ -0,0 +1,237 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import emulated_bluetooth_device_pb2 as emulated__bluetooth__device__pb2
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
from . import emulated_bluetooth_pb2 as emulated__bluetooth__pb2
class EmulatedBluetoothServiceStub(object):
"""An Emulated Bluetooth Service exposes the emulated bluetooth chip from the
android emulator. It allows you to register emulated bluetooth devices and
control the packets that are exchanged between the device and the world.
This service enables you to establish a "virtual network" of emulated
bluetooth devices that can interact with each other.
Note: This is not yet finalized, it is likely that these definitions will
evolve.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.registerClassicPhy = channel.stream_stream(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerClassicPhy',
request_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
response_deserializer=emulated__bluetooth__pb2.RawData.FromString,
)
self.registerBlePhy = channel.stream_stream(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerBlePhy',
request_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
response_deserializer=emulated__bluetooth__pb2.RawData.FromString,
)
self.registerHCIDevice = channel.stream_stream(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerHCIDevice',
request_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
response_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
)
self.registerGattDevice = channel.unary_unary(
'/android.emulation.bluetooth.EmulatedBluetoothService/registerGattDevice',
request_serializer=emulated__bluetooth__device__pb2.GattDevice.SerializeToString,
response_deserializer=emulated__bluetooth__pb2.RegistrationStatus.FromString,
)
class EmulatedBluetoothServiceServicer(object):
"""An Emulated Bluetooth Service exposes the emulated bluetooth chip from the
android emulator. It allows you to register emulated bluetooth devices and
control the packets that are exchanged between the device and the world.
This service enables you to establish a "virtual network" of emulated
bluetooth devices that can interact with each other.
Note: This is not yet finalized, it is likely that these definitions will
evolve.
"""
def registerClassicPhy(self, request_iterator, context):
"""Connect device to link layer. This will establish a direct connection
to the emulated bluetooth chip and configure the following:
- Each connection creates a new device and attaches it to the link layer
- Link Layer packets are transmitted directly to the phy
This should be used for classic connections.
This is used to directly connect various android emulators together.
For example a wear device can connect to an android emulator through
this.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def registerBlePhy(self, request_iterator, context):
"""Connect device to link layer. This will establish a direct connection
to root canal and execute the following:
- Each connection creates a new device and attaches it to the link layer
- Link Layer packets are transmitted directly to the phy
This should be used for BLE connections.
This is used to directly connect various android emulators together.
For example a wear device can connect to an android emulator through
this.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def registerHCIDevice(self, request_iterator, context):
"""Connect the device to the emulated bluetooth chip. The device will
participate in the network. You can configure the chip to scan, advertise
and setup connections with other devices that are connected to the
network.
This is usually used when you have a need for an emulated bluetooth chip
and have a bluetooth stack that can interpret and handle the packets
correctly.
For example the apache nimble stack can use this endpoint as the
transport layer.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def registerGattDevice(self, request, context):
"""Registers an emulated bluetooth device. The emulator will reach out to
the emulated device to read/write and subscribe to properties.
The following gRPC error codes can be returned:
- FAILED_PRECONDITION (code 9):
- root canal is not available on this device
- unable to reach the endpoint for the GattDevice
- INTERNAL (code 13) if there was an internal emulator failure.
The device will not be discoverable in case of an error.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_EmulatedBluetoothServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'registerClassicPhy': grpc.stream_stream_rpc_method_handler(
servicer.registerClassicPhy,
request_deserializer=emulated__bluetooth__pb2.RawData.FromString,
response_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
),
'registerBlePhy': grpc.stream_stream_rpc_method_handler(
servicer.registerBlePhy,
request_deserializer=emulated__bluetooth__pb2.RawData.FromString,
response_serializer=emulated__bluetooth__pb2.RawData.SerializeToString,
),
'registerHCIDevice': grpc.stream_stream_rpc_method_handler(
servicer.registerHCIDevice,
request_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
response_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
),
'registerGattDevice': grpc.unary_unary_rpc_method_handler(
servicer.registerGattDevice,
request_deserializer=emulated__bluetooth__device__pb2.GattDevice.FromString,
response_serializer=emulated__bluetooth__pb2.RegistrationStatus.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'android.emulation.bluetooth.EmulatedBluetoothService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class EmulatedBluetoothService(object):
"""An Emulated Bluetooth Service exposes the emulated bluetooth chip from the
android emulator. It allows you to register emulated bluetooth devices and
control the packets that are exchanged between the device and the world.
This service enables you to establish a "virtual network" of emulated
bluetooth devices that can interact with each other.
Note: This is not yet finalized, it is likely that these definitions will
evolve.
"""
@staticmethod
def registerClassicPhy(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/android.emulation.bluetooth.EmulatedBluetoothService/registerClassicPhy',
emulated__bluetooth__pb2.RawData.SerializeToString,
emulated__bluetooth__pb2.RawData.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def registerBlePhy(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/android.emulation.bluetooth.EmulatedBluetoothService/registerBlePhy',
emulated__bluetooth__pb2.RawData.SerializeToString,
emulated__bluetooth__pb2.RawData.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def registerHCIDevice(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/android.emulation.bluetooth.EmulatedBluetoothService/registerHCIDevice',
emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
emulated__bluetooth__packets__pb2.HCIPacket.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def registerGattDevice(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/android.emulation.bluetooth.EmulatedBluetoothService/registerGattDevice',
emulated__bluetooth__device__pb2.GattDevice.SerializeToString,
emulated__bluetooth__pb2.RegistrationStatus.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_vhci.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1d\x65mulated_bluetooth_vhci.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto2y\n\x15VhciForwardingService\x12`\n\nattachVhci\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'emulated_bluetooth_vhci_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_VHCIFORWARDINGSERVICE._serialized_start=96
_VHCIFORWARDINGSERVICE._serialized_end=217
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,5 @@
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
DESCRIPTOR: _descriptor.FileDescriptor

View File

@@ -1,17 +1,3 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
@@ -35,10 +21,10 @@ class VhciForwardingServiceStub(object):
channel: A grpc.Channel.
"""
self.attachVhci = channel.stream_stream(
'/android.emulation.bluetooth.VhciForwardingService/attachVhci',
request_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
response_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
)
'/android.emulation.bluetooth.VhciForwardingService/attachVhci',
request_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
response_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
)
class VhciForwardingServiceServicer(object):
@@ -75,19 +61,18 @@ class VhciForwardingServiceServicer(object):
def add_VhciForwardingServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'attachVhci': grpc.stream_stream_rpc_method_handler(
servicer.attachVhci,
request_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
response_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
),
'attachVhci': grpc.stream_stream_rpc_method_handler(
servicer.attachVhci,
request_deserializer=emulated__bluetooth__packets__pb2.HCIPacket.FromString,
response_serializer=emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'android.emulation.bluetooth.VhciForwardingService', rpc_method_handlers
)
'android.emulation.bluetooth.VhciForwardingService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
# This class is part of an EXPERIMENTAL API.
class VhciForwardingService(object):
"""This is a service which allows you to directly intercept the VHCI packets
that are coming and going to the device before they are delivered to
@@ -98,30 +83,18 @@ class VhciForwardingService(object):
"""
@staticmethod
def attachVhci(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
def attachVhci(request_iterator,
target,
'/android.emulation.bluetooth.VhciForwardingService/attachVhci',
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/android.emulation.bluetooth.VhciForwardingService/attachVhci',
emulated__bluetooth__packets__pb2.HCIPacket.SerializeToString,
emulated__bluetooth__packets__pb2.HCIPacket.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: grpc_endpoint_description.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fgrpc_endpoint_description.proto\x12\x18\x61ndroid.emulation.remote\"V\n\x0b\x43redentials\x12\x16\n\x0epem_root_certs\x18\x01 \x01(\t\x12\x17\n\x0fpem_private_key\x18\x02 \x01(\t\x12\x16\n\x0epem_cert_chain\x18\x03 \x01(\t\"$\n\x06Header\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x96\x01\n\x08\x45ndpoint\x12\x0e\n\x06target\x18\x01 \x01(\t\x12>\n\x0ftls_credentials\x18\x02 \x01(\x0b\x32%.android.emulation.remote.Credentials\x12:\n\x10required_headers\x18\x03 \x03(\x0b\x32 .android.emulation.remote.HeaderB \n\x1c\x63om.android.emulation.remoteP\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'grpc_endpoint_description_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\034com.android.emulation.remoteP\001'
_CREDENTIALS._serialized_start=61
_CREDENTIALS._serialized_end=147
_HEADER._serialized_start=149
_HEADER._serialized_end=185
_ENDPOINT._serialized_start=188
_ENDPOINT._serialized_end=338
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,34 @@
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class Credentials(_message.Message):
__slots__ = ["pem_cert_chain", "pem_private_key", "pem_root_certs"]
PEM_CERT_CHAIN_FIELD_NUMBER: _ClassVar[int]
PEM_PRIVATE_KEY_FIELD_NUMBER: _ClassVar[int]
PEM_ROOT_CERTS_FIELD_NUMBER: _ClassVar[int]
pem_cert_chain: str
pem_private_key: str
pem_root_certs: str
def __init__(self, pem_root_certs: _Optional[str] = ..., pem_private_key: _Optional[str] = ..., pem_cert_chain: _Optional[str] = ...) -> None: ...
class Endpoint(_message.Message):
__slots__ = ["required_headers", "target", "tls_credentials"]
REQUIRED_HEADERS_FIELD_NUMBER: _ClassVar[int]
TARGET_FIELD_NUMBER: _ClassVar[int]
TLS_CREDENTIALS_FIELD_NUMBER: _ClassVar[int]
required_headers: _containers.RepeatedCompositeFieldContainer[Header]
target: str
tls_credentials: Credentials
def __init__(self, target: _Optional[str] = ..., tls_credentials: _Optional[_Union[Credentials, _Mapping]] = ..., required_headers: _Optional[_Iterable[_Union[Header, _Mapping]]] = ...) -> None: ...
class Header(_message.Message):
__slots__ = ["key", "value"]
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: hci_packet.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10hci_packet.proto\x12\rnetsim.packet\"\xb2\x01\n\tHCIPacket\x12\x38\n\x0bpacket_type\x18\x01 \x01(\x0e\x32#.netsim.packet.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"[\n\nPacketType\x12\x1a\n\x16HCI_PACKET_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x43OMMAND\x10\x01\x12\x07\n\x03\x41\x43L\x10\x02\x12\x07\n\x03SCO\x10\x03\x12\t\n\x05\x45VENT\x10\x04\x12\x07\n\x03ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'hci_packet_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_HCIPACKET._serialized_start=36
_HCIPACKET._serialized_end=214
_HCIPACKET_PACKETTYPE._serialized_start=123
_HCIPACKET_PACKETTYPE._serialized_end=214
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,22 @@
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class HCIPacket(_message.Message):
__slots__ = ["packet", "packet_type"]
class PacketType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
ACL: HCIPacket.PacketType
COMMAND: HCIPacket.PacketType
EVENT: HCIPacket.PacketType
HCI_PACKET_UNSPECIFIED: HCIPacket.PacketType
ISO: HCIPacket.PacketType
PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_TYPE_FIELD_NUMBER: _ClassVar[int]
SCO: HCIPacket.PacketType
packet: bytes
packet_type: HCIPacket.PacketType
def __init__(self, packet_type: _Optional[_Union[HCIPacket.PacketType, str]] = ..., packet: _Optional[bytes] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: packet_streamer.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import hci_packet_pb2 as hci__packet__pb2
from . import startup_pb2 as startup__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15packet_streamer.proto\x12\rnetsim.packet\x1a\x10hci_packet.proto\x1a\rstartup.proto\"\x93\x01\n\rPacketRequest\x12\x30\n\x0cinitial_info\x18\x01 \x01(\x0b\x32\x18.netsim.startup.ChipInfoH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0e\n\x0crequest_type\"t\n\x0ePacketResponse\x12\x0f\n\x05\x65rror\x18\x01 \x01(\tH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0f\n\rresponse_type2b\n\x0ePacketStreamer\x12P\n\rStreamPackets\x12\x1c.netsim.packet.PacketRequest\x1a\x1d.netsim.packet.PacketResponse(\x01\x30\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'packet_streamer_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_PACKETREQUEST._serialized_start=74
_PACKETREQUEST._serialized_end=221
_PACKETRESPONSE._serialized_start=223
_PACKETRESPONSE._serialized_end=339
_PACKETSTREAMER._serialized_start=341
_PACKETSTREAMER._serialized_end=439
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,27 @@
from . import hci_packet_pb2 as _hci_packet_pb2
from . import startup_pb2 as _startup_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class PacketRequest(_message.Message):
__slots__ = ["hci_packet", "initial_info", "packet"]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
INITIAL_INFO_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]
hci_packet: _hci_packet_pb2.HCIPacket
initial_info: _startup_pb2.ChipInfo
packet: bytes
def __init__(self, initial_info: _Optional[_Union[_startup_pb2.ChipInfo, _Mapping]] = ..., hci_packet: _Optional[_Union[_hci_packet_pb2.HCIPacket, _Mapping]] = ..., packet: _Optional[bytes] = ...) -> None: ...
class PacketResponse(_message.Message):
__slots__ = ["error", "hci_packet", "packet"]
ERROR_FIELD_NUMBER: _ClassVar[int]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]
error: str
hci_packet: _hci_packet_pb2.HCIPacket
packet: bytes
def __init__(self, error: _Optional[str] = ..., hci_packet: _Optional[_Union[_hci_packet_pb2.HCIPacket, _Mapping]] = ..., packet: _Optional[bytes] = ...) -> None: ...

View File

@@ -0,0 +1,109 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import packet_streamer_pb2 as packet__streamer__pb2
class PacketStreamerStub(object):
"""*
This is the packet service for the network simulator.
Android Virtual Devices (AVDs) and accessory devices use this service to
connect to the network simulator and pass packets back and forth.
AVDs running in a guest VM are built with virtual controllers for each radio
chip. These controllers route chip requests to host emulators (qemu and
crosvm) using virtio and from there they are forwarded to this gRpc service.
This setup provides a transparent radio environment across AVDs and
accessories because the network simulator contains libraries to emulate
Bluetooth, 80211MAC, UWB, and Rtt chips.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.StreamPackets = channel.stream_stream(
'/netsim.packet.PacketStreamer/StreamPackets',
request_serializer=packet__streamer__pb2.PacketRequest.SerializeToString,
response_deserializer=packet__streamer__pb2.PacketResponse.FromString,
)
class PacketStreamerServicer(object):
"""*
This is the packet service for the network simulator.
Android Virtual Devices (AVDs) and accessory devices use this service to
connect to the network simulator and pass packets back and forth.
AVDs running in a guest VM are built with virtual controllers for each radio
chip. These controllers route chip requests to host emulators (qemu and
crosvm) using virtio and from there they are forwarded to this gRpc service.
This setup provides a transparent radio environment across AVDs and
accessories because the network simulator contains libraries to emulate
Bluetooth, 80211MAC, UWB, and Rtt chips.
"""
def StreamPackets(self, request_iterator, context):
"""Attach a virtual radio controller to the network simulation.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PacketStreamerServicer_to_server(servicer, server):
rpc_method_handlers = {
'StreamPackets': grpc.stream_stream_rpc_method_handler(
servicer.StreamPackets,
request_deserializer=packet__streamer__pb2.PacketRequest.FromString,
response_serializer=packet__streamer__pb2.PacketResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'netsim.packet.PacketStreamer', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class PacketStreamer(object):
"""*
This is the packet service for the network simulator.
Android Virtual Devices (AVDs) and accessory devices use this service to
connect to the network simulator and pass packets back and forth.
AVDs running in a guest VM are built with virtual controllers for each radio
chip. These controllers route chip requests to host emulators (qemu and
crosvm) using virtio and from there they are forwarded to this gRpc service.
This setup provides a transparent radio environment across AVDs and
accessories because the network simulator contains libraries to emulate
Bluetooth, 80211MAC, UWB, and Rtt chips.
"""
@staticmethod
def StreamPackets(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/netsim.packet.PacketStreamer/StreamPackets',
packet__streamer__pb2.PacketRequest.SerializeToString,
packet__streamer__pb2.PacketResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: startup.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import common_pb2 as common__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rstartup.proto\x12\x0enetsim.startup\x1a\x0c\x63ommon.proto\"\x7f\n\x0bStartupInfo\x12\x33\n\x07\x64\x65vices\x18\x01 \x03(\x0b\x32\".netsim.startup.StartupInfo.Device\x1a;\n\x06\x44\x65vice\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x05\x63hips\x18\x02 \x03(\x0b\x32\x14.netsim.startup.Chip\"<\n\x08\x43hipInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\"\n\x04\x63hip\x18\x02 \x01(\x0b\x32\x14.netsim.startup.Chip\"\x96\x01\n\x04\x43hip\x12%\n\x04kind\x18\x01 \x01(\x0e\x32\x17.netsim.common.ChipKind\x12\n\n\x02id\x18\x02 \x01(\t\x12\x14\n\x0cmanufacturer\x18\x03 \x01(\t\x12\x14\n\x0cproduct_name\x18\x04 \x01(\t\x12\r\n\x05\x66\x64_in\x18\x05 \x01(\x05\x12\x0e\n\x06\x66\x64_out\x18\x06 \x01(\x05\x12\x10\n\x08loopback\x18\x07 \x01(\x08\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'startup_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_STARTUPINFO._serialized_start=47
_STARTUPINFO._serialized_end=174
_STARTUPINFO_DEVICE._serialized_start=115
_STARTUPINFO_DEVICE._serialized_end=174
_CHIPINFO._serialized_start=176
_CHIPINFO._serialized_end=236
_CHIP._serialized_start=239
_CHIP._serialized_end=389
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,46 @@
from . import common_pb2 as _common_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class Chip(_message.Message):
__slots__ = ["fd_in", "fd_out", "id", "kind", "loopback", "manufacturer", "product_name"]
FD_IN_FIELD_NUMBER: _ClassVar[int]
FD_OUT_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
KIND_FIELD_NUMBER: _ClassVar[int]
LOOPBACK_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
fd_in: int
fd_out: int
id: str
kind: _common_pb2.ChipKind
loopback: bool
manufacturer: str
product_name: str
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., fd_in: _Optional[int] = ..., fd_out: _Optional[int] = ..., loopback: bool = ...) -> None: ...
class ChipInfo(_message.Message):
__slots__ = ["chip", "name"]
CHIP_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
chip: Chip
name: str
def __init__(self, name: _Optional[str] = ..., chip: _Optional[_Union[Chip, _Mapping]] = ...) -> None: ...
class StartupInfo(_message.Message):
__slots__ = ["devices"]
class Device(_message.Message):
__slots__ = ["chips", "name"]
CHIPS_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
chips: _containers.RepeatedCompositeFieldContainer[Chip]
name: str
def __init__(self, name: _Optional[str] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ...) -> None: ...
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[StartupInfo.Device]
def __init__(self, devices: _Optional[_Iterable[_Union[StartupInfo.Device, _Mapping]]] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc