update to new netsim proto with DeviceInfo

This commit is contained in:
Gilles Boccon-Gibod
2024-10-22 11:48:42 -07:00
parent 1de7d2cd6f
commit 7584daa3f9
26 changed files with 684 additions and 188 deletions

View File

@@ -20,12 +20,14 @@ import atexit
import logging
import os
import pathlib
import platform
import sys
from typing import Dict, Optional
import grpc.aio
from .common import (
import bumble
from bumble.transport.common import (
ParserSource,
PumpedTransport,
PumpedPacketSource,
@@ -36,15 +38,15 @@ from .common import (
)
# pylint: disable=no-name-in-module
from .grpc_protobuf.packet_streamer_pb2_grpc import (
from .grpc_protobuf.netsim.packet_streamer_pb2_grpc import (
PacketStreamerStub,
PacketStreamerServicer,
add_PacketStreamerServicer_to_server,
)
from .grpc_protobuf.packet_streamer_pb2 import PacketRequest, PacketResponse
from .grpc_protobuf.hci_packet_pb2 import HCIPacket
from .grpc_protobuf.startup_pb2 import Chip, ChipInfo
from .grpc_protobuf.common_pb2 import ChipKind
from .grpc_protobuf.netsim.packet_streamer_pb2 import PacketRequest, PacketResponse
from .grpc_protobuf.netsim.hci_packet_pb2 import HCIPacket
from .grpc_protobuf.netsim.startup_pb2 import Chip, ChipInfo, DeviceInfo
from .grpc_protobuf.netsim.common_pb2 import ChipKind
# -----------------------------------------------------------------------------
@@ -199,7 +201,6 @@ async def open_android_netsim_controller_transport(
data = (
bytes([request.hci_packet.packet_type]) + request.hci_packet.packet
)
logger.debug(f'<<< PACKET: {data.hex()}')
self.on_data_received(data)
async def send_packet(self, data):
@@ -253,7 +254,7 @@ async def open_android_netsim_controller_transport(
# Check that we don't already have a device
if self.device:
logger.debug('busy, already serving a device')
logger.debug('Busy, already serving a device')
return PacketResponse(error='Busy')
# Instantiate a new device
@@ -318,10 +319,16 @@ async def open_android_netsim_host_transport_with_channel(
self.hci_device = hci_device
async def start(self): # Send the startup info
chip_info = ChipInfo(
device_info = DeviceInfo(
name=self.name,
chip=Chip(kind=ChipKind.BLUETOOTH, manufacturer=self.manufacturer),
kind='Bumble',
version=bumble.__version__,
sdk_version=platform.python_version(),
build_id=platform.platform(),
arch=platform.machine(),
)
chip = Chip(kind=ChipKind.BLUETOOTH, manufacturer=self.manufacturer)
chip_info = ChipInfo(name=self.name, chip=chip, device_info=device_info)
logger.debug(f'Sending chip info to netsim: {chip_info}')
await self.hci_device.write(PacketRequest(initial_info=chip_info))

View File

@@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: hci_packet.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10hci_packet.proto\x12\rnetsim.packet\"\xb2\x01\n\tHCIPacket\x12\x38\n\x0bpacket_type\x18\x01 \x01(\x0e\x32#.netsim.packet.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"[\n\nPacketType\x12\x1a\n\x16HCI_PACKET_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x43OMMAND\x10\x01\x12\x07\n\x03\x41\x43L\x10\x02\x12\x07\n\x03SCO\x10\x03\x12\t\n\x05\x45VENT\x10\x04\x12\x07\n\x03ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'hci_packet_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_HCIPACKET._serialized_start=36
_HCIPACKET._serialized_end=214
_HCIPACKET_PACKETTYPE._serialized_start=123
_HCIPACKET_PACKETTYPE._serialized_end=214
# @@protoc_insertion_point(module_scope)

View File

@@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: common.proto
# source: netsim/common.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@@ -13,13 +14,13 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x63ommon.proto\x12\rnetsim.common*=\n\x08\x43hipKind\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\r\n\tBLUETOOTH\x10\x01\x12\x08\n\x04WIFI\x10\x02\x12\x07\n\x03UWB\x10\x03\x62\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13netsim/common.proto\x12\rnetsim.common*S\n\x08\x43hipKind\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\r\n\tBLUETOOTH\x10\x01\x12\x08\n\x04WIFI\x10\x02\x12\x07\n\x03UWB\x10\x03\x12\x14\n\x10\x42LUETOOTH_BEACON\x10\x04\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'common_pb2', globals())
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.common_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_CHIPKIND._serialized_start=31
_CHIPKIND._serialized_end=92
_globals['_CHIPKIND']._serialized_start=38
_globals['_CHIPKIND']._serialized_end=121
# @@protoc_insertion_point(module_scope)

View File

@@ -2,11 +2,17 @@ from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
BLUETOOTH: ChipKind
DESCRIPTOR: _descriptor.FileDescriptor
UNSPECIFIED: ChipKind
UWB: ChipKind
WIFI: ChipKind
class ChipKind(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
__slots__ = ()
UNSPECIFIED: _ClassVar[ChipKind]
BLUETOOTH: _ClassVar[ChipKind]
WIFI: _ClassVar[ChipKind]
UWB: _ClassVar[ChipKind]
BLUETOOTH_BEACON: _ClassVar[ChipKind]
UNSPECIFIED: ChipKind
BLUETOOTH: ChipKind
WIFI: ChipKind
UWB: ChipKind
BLUETOOTH_BEACON: ChipKind

View File

@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: netsim/hci_packet.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17netsim/hci_packet.proto\x12\rnetsim.packet\"\xb2\x01\n\tHCIPacket\x12\x38\n\x0bpacket_type\x18\x01 \x01(\x0e\x32#.netsim.packet.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"[\n\nPacketType\x12\x1a\n\x16HCI_PACKET_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x43OMMAND\x10\x01\x12\x07\n\x03\x41\x43L\x10\x02\x12\x07\n\x03SCO\x10\x03\x12\t\n\x05\x45VENT\x10\x04\x12\x07\n\x03ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.hci_packet_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_globals['_HCIPACKET']._serialized_start=43
_globals['_HCIPACKET']._serialized_end=221
_globals['_HCIPACKET_PACKETTYPE']._serialized_start=130
_globals['_HCIPACKET_PACKETTYPE']._serialized_end=221
# @@protoc_insertion_point(module_scope)

View File

@@ -6,17 +6,23 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class HCIPacket(_message.Message):
__slots__ = ["packet", "packet_type"]
__slots__ = ("packet_type", "packet")
class PacketType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
ACL: HCIPacket.PacketType
COMMAND: HCIPacket.PacketType
EVENT: HCIPacket.PacketType
__slots__ = ()
HCI_PACKET_UNSPECIFIED: _ClassVar[HCIPacket.PacketType]
COMMAND: _ClassVar[HCIPacket.PacketType]
ACL: _ClassVar[HCIPacket.PacketType]
SCO: _ClassVar[HCIPacket.PacketType]
EVENT: _ClassVar[HCIPacket.PacketType]
ISO: _ClassVar[HCIPacket.PacketType]
HCI_PACKET_UNSPECIFIED: HCIPacket.PacketType
ISO: HCIPacket.PacketType
PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_TYPE_FIELD_NUMBER: _ClassVar[int]
COMMAND: HCIPacket.PacketType
ACL: HCIPacket.PacketType
SCO: HCIPacket.PacketType
packet: bytes
EVENT: HCIPacket.PacketType
ISO: HCIPacket.PacketType
PACKET_TYPE_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]
packet_type: HCIPacket.PacketType
packet: bytes
def __init__(self, packet_type: _Optional[_Union[HCIPacket.PacketType, str]] = ..., packet: _Optional[bytes] = ...) -> None: ...

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,238 @@
from bumble.transport.grpc_protobuf.netsim import common_pb2 as _common_pb2
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from bumble.transport.grpc_protobuf.rootcanal import configuration_pb2 as _configuration_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class PhyKind(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NONE: _ClassVar[PhyKind]
BLUETOOTH_CLASSIC: _ClassVar[PhyKind]
BLUETOOTH_LOW_ENERGY: _ClassVar[PhyKind]
WIFI: _ClassVar[PhyKind]
UWB: _ClassVar[PhyKind]
WIFI_RTT: _ClassVar[PhyKind]
NONE: PhyKind
BLUETOOTH_CLASSIC: PhyKind
BLUETOOTH_LOW_ENERGY: PhyKind
WIFI: PhyKind
UWB: PhyKind
WIFI_RTT: PhyKind
class Position(_message.Message):
__slots__ = ("x", "y", "z")
X_FIELD_NUMBER: _ClassVar[int]
Y_FIELD_NUMBER: _ClassVar[int]
Z_FIELD_NUMBER: _ClassVar[int]
x: float
y: float
z: float
def __init__(self, x: _Optional[float] = ..., y: _Optional[float] = ..., z: _Optional[float] = ...) -> None: ...
class Orientation(_message.Message):
__slots__ = ("yaw", "pitch", "roll")
YAW_FIELD_NUMBER: _ClassVar[int]
PITCH_FIELD_NUMBER: _ClassVar[int]
ROLL_FIELD_NUMBER: _ClassVar[int]
yaw: float
pitch: float
roll: float
def __init__(self, yaw: _Optional[float] = ..., pitch: _Optional[float] = ..., roll: _Optional[float] = ...) -> None: ...
class Chip(_message.Message):
__slots__ = ("kind", "id", "name", "manufacturer", "product_name", "bt", "ble_beacon", "uwb", "wifi", "offset")
class Radio(_message.Message):
__slots__ = ("state", "range", "tx_count", "rx_count")
STATE_FIELD_NUMBER: _ClassVar[int]
RANGE_FIELD_NUMBER: _ClassVar[int]
TX_COUNT_FIELD_NUMBER: _ClassVar[int]
RX_COUNT_FIELD_NUMBER: _ClassVar[int]
state: bool
range: float
tx_count: int
rx_count: int
def __init__(self, state: bool = ..., range: _Optional[float] = ..., tx_count: _Optional[int] = ..., rx_count: _Optional[int] = ...) -> None: ...
class Bluetooth(_message.Message):
__slots__ = ("low_energy", "classic", "address", "bt_properties")
LOW_ENERGY_FIELD_NUMBER: _ClassVar[int]
CLASSIC_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
BT_PROPERTIES_FIELD_NUMBER: _ClassVar[int]
low_energy: Chip.Radio
classic: Chip.Radio
address: str
bt_properties: _configuration_pb2.Controller
def __init__(self, low_energy: _Optional[_Union[Chip.Radio, _Mapping]] = ..., classic: _Optional[_Union[Chip.Radio, _Mapping]] = ..., address: _Optional[str] = ..., bt_properties: _Optional[_Union[_configuration_pb2.Controller, _Mapping]] = ...) -> None: ...
class BleBeacon(_message.Message):
__slots__ = ("bt", "address", "settings", "adv_data", "scan_response")
class AdvertiseSettings(_message.Message):
__slots__ = ("advertise_mode", "milliseconds", "tx_power_level", "dbm", "scannable", "timeout")
class AdvertiseMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
LOW_POWER: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode]
BALANCED: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode]
LOW_LATENCY: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode]
LOW_POWER: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
BALANCED: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
LOW_LATENCY: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
class AdvertiseTxPower(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
ULTRA_LOW: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
LOW: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
MEDIUM: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
HIGH: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
ULTRA_LOW: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
LOW: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
MEDIUM: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
HIGH: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
ADVERTISE_MODE_FIELD_NUMBER: _ClassVar[int]
MILLISECONDS_FIELD_NUMBER: _ClassVar[int]
TX_POWER_LEVEL_FIELD_NUMBER: _ClassVar[int]
DBM_FIELD_NUMBER: _ClassVar[int]
SCANNABLE_FIELD_NUMBER: _ClassVar[int]
TIMEOUT_FIELD_NUMBER: _ClassVar[int]
advertise_mode: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
milliseconds: int
tx_power_level: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
dbm: int
scannable: bool
timeout: int
def __init__(self, advertise_mode: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode, str]] = ..., milliseconds: _Optional[int] = ..., tx_power_level: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower, str]] = ..., dbm: _Optional[int] = ..., scannable: bool = ..., timeout: _Optional[int] = ...) -> None: ...
class AdvertiseData(_message.Message):
__slots__ = ("include_device_name", "include_tx_power_level", "manufacturer_data", "services")
class Service(_message.Message):
__slots__ = ("uuid", "data")
UUID_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
uuid: str
data: bytes
def __init__(self, uuid: _Optional[str] = ..., data: _Optional[bytes] = ...) -> None: ...
INCLUDE_DEVICE_NAME_FIELD_NUMBER: _ClassVar[int]
INCLUDE_TX_POWER_LEVEL_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_DATA_FIELD_NUMBER: _ClassVar[int]
SERVICES_FIELD_NUMBER: _ClassVar[int]
include_device_name: bool
include_tx_power_level: bool
manufacturer_data: bytes
services: _containers.RepeatedCompositeFieldContainer[Chip.BleBeacon.AdvertiseData.Service]
def __init__(self, include_device_name: bool = ..., include_tx_power_level: bool = ..., manufacturer_data: _Optional[bytes] = ..., services: _Optional[_Iterable[_Union[Chip.BleBeacon.AdvertiseData.Service, _Mapping]]] = ...) -> None: ...
BT_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
SETTINGS_FIELD_NUMBER: _ClassVar[int]
ADV_DATA_FIELD_NUMBER: _ClassVar[int]
SCAN_RESPONSE_FIELD_NUMBER: _ClassVar[int]
bt: Chip.Bluetooth
address: str
settings: Chip.BleBeacon.AdvertiseSettings
adv_data: Chip.BleBeacon.AdvertiseData
scan_response: Chip.BleBeacon.AdvertiseData
def __init__(self, bt: _Optional[_Union[Chip.Bluetooth, _Mapping]] = ..., address: _Optional[str] = ..., settings: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings, _Mapping]] = ..., adv_data: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ..., scan_response: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ...) -> None: ...
KIND_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
BT_FIELD_NUMBER: _ClassVar[int]
BLE_BEACON_FIELD_NUMBER: _ClassVar[int]
UWB_FIELD_NUMBER: _ClassVar[int]
WIFI_FIELD_NUMBER: _ClassVar[int]
OFFSET_FIELD_NUMBER: _ClassVar[int]
kind: _common_pb2.ChipKind
id: int
name: str
manufacturer: str
product_name: str
bt: Chip.Bluetooth
ble_beacon: Chip.BleBeacon
uwb: Chip.Radio
wifi: Chip.Radio
offset: Position
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[int] = ..., name: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., bt: _Optional[_Union[Chip.Bluetooth, _Mapping]] = ..., ble_beacon: _Optional[_Union[Chip.BleBeacon, _Mapping]] = ..., uwb: _Optional[_Union[Chip.Radio, _Mapping]] = ..., wifi: _Optional[_Union[Chip.Radio, _Mapping]] = ..., offset: _Optional[_Union[Position, _Mapping]] = ...) -> None: ...
class ChipCreate(_message.Message):
__slots__ = ("kind", "address", "name", "manufacturer", "product_name", "ble_beacon", "bt_properties")
class BleBeaconCreate(_message.Message):
__slots__ = ("address", "settings", "adv_data", "scan_response")
ADDRESS_FIELD_NUMBER: _ClassVar[int]
SETTINGS_FIELD_NUMBER: _ClassVar[int]
ADV_DATA_FIELD_NUMBER: _ClassVar[int]
SCAN_RESPONSE_FIELD_NUMBER: _ClassVar[int]
address: str
settings: Chip.BleBeacon.AdvertiseSettings
adv_data: Chip.BleBeacon.AdvertiseData
scan_response: Chip.BleBeacon.AdvertiseData
def __init__(self, address: _Optional[str] = ..., settings: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings, _Mapping]] = ..., adv_data: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ..., scan_response: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ...) -> None: ...
KIND_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
BLE_BEACON_FIELD_NUMBER: _ClassVar[int]
BT_PROPERTIES_FIELD_NUMBER: _ClassVar[int]
kind: _common_pb2.ChipKind
address: str
name: str
manufacturer: str
product_name: str
ble_beacon: ChipCreate.BleBeaconCreate
bt_properties: _configuration_pb2.Controller
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., address: _Optional[str] = ..., name: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., ble_beacon: _Optional[_Union[ChipCreate.BleBeaconCreate, _Mapping]] = ..., bt_properties: _Optional[_Union[_configuration_pb2.Controller, _Mapping]] = ...) -> None: ...
class Device(_message.Message):
__slots__ = ("id", "name", "visible", "position", "orientation", "chips")
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
VISIBLE_FIELD_NUMBER: _ClassVar[int]
POSITION_FIELD_NUMBER: _ClassVar[int]
ORIENTATION_FIELD_NUMBER: _ClassVar[int]
CHIPS_FIELD_NUMBER: _ClassVar[int]
id: int
name: str
visible: bool
position: Position
orientation: Orientation
chips: _containers.RepeatedCompositeFieldContainer[Chip]
def __init__(self, id: _Optional[int] = ..., name: _Optional[str] = ..., visible: bool = ..., position: _Optional[_Union[Position, _Mapping]] = ..., orientation: _Optional[_Union[Orientation, _Mapping]] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ...) -> None: ...
class DeviceCreate(_message.Message):
__slots__ = ("name", "position", "orientation", "chips")
NAME_FIELD_NUMBER: _ClassVar[int]
POSITION_FIELD_NUMBER: _ClassVar[int]
ORIENTATION_FIELD_NUMBER: _ClassVar[int]
CHIPS_FIELD_NUMBER: _ClassVar[int]
name: str
position: Position
orientation: Orientation
chips: _containers.RepeatedCompositeFieldContainer[ChipCreate]
def __init__(self, name: _Optional[str] = ..., position: _Optional[_Union[Position, _Mapping]] = ..., orientation: _Optional[_Union[Orientation, _Mapping]] = ..., chips: _Optional[_Iterable[_Union[ChipCreate, _Mapping]]] = ...) -> None: ...
class Scene(_message.Message):
__slots__ = ("devices",)
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[Device]
def __init__(self, devices: _Optional[_Iterable[_Union[Device, _Mapping]]] = ...) -> None: ...
class Capture(_message.Message):
__slots__ = ("id", "chip_kind", "device_name", "state", "size", "records", "timestamp", "valid")
ID_FIELD_NUMBER: _ClassVar[int]
CHIP_KIND_FIELD_NUMBER: _ClassVar[int]
DEVICE_NAME_FIELD_NUMBER: _ClassVar[int]
STATE_FIELD_NUMBER: _ClassVar[int]
SIZE_FIELD_NUMBER: _ClassVar[int]
RECORDS_FIELD_NUMBER: _ClassVar[int]
TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
VALID_FIELD_NUMBER: _ClassVar[int]
id: int
chip_kind: _common_pb2.ChipKind
device_name: str
state: bool
size: int
records: int
timestamp: _timestamp_pb2.Timestamp
valid: bool
def __init__(self, id: _Optional[int] = ..., chip_kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., device_name: _Optional[str] = ..., state: bool = ..., size: _Optional[int] = ..., records: _Optional[int] = ..., timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., valid: bool = ...) -> None: ...

View File

@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: netsim/packet_streamer.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from bumble.transport.grpc_protobuf.netsim import hci_packet_pb2 as netsim_dot_hci__packet__pb2
from bumble.transport.grpc_protobuf.netsim import startup_pb2 as netsim_dot_startup__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1cnetsim/packet_streamer.proto\x12\rnetsim.packet\x1a\x17netsim/hci_packet.proto\x1a\x14netsim/startup.proto\"\x93\x01\n\rPacketRequest\x12\x30\n\x0cinitial_info\x18\x01 \x01(\x0b\x32\x18.netsim.startup.ChipInfoH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0e\n\x0crequest_type\"t\n\x0ePacketResponse\x12\x0f\n\x05\x65rror\x18\x01 \x01(\tH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0f\n\rresponse_type2b\n\x0ePacketStreamer\x12P\n\rStreamPackets\x12\x1c.netsim.packet.PacketRequest\x1a\x1d.netsim.packet.PacketResponse(\x01\x30\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.packet_streamer_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_PACKETREQUEST']._serialized_start=95
_globals['_PACKETREQUEST']._serialized_end=242
_globals['_PACKETRESPONSE']._serialized_start=244
_globals['_PACKETRESPONSE']._serialized_end=360
_globals['_PACKETSTREAMER']._serialized_start=362
_globals['_PACKETSTREAMER']._serialized_end=460
# @@protoc_insertion_point(module_scope)

View File

@@ -1,5 +1,5 @@
from . import hci_packet_pb2 as _hci_packet_pb2
from . import startup_pb2 as _startup_pb2
from bumble.transport.grpc_protobuf.netsim import hci_packet_pb2 as _hci_packet_pb2
from bumble.transport.grpc_protobuf.netsim import startup_pb2 as _startup_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union
@@ -7,17 +7,17 @@ from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Opti
DESCRIPTOR: _descriptor.FileDescriptor
class PacketRequest(_message.Message):
__slots__ = ["hci_packet", "initial_info", "packet"]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
__slots__ = ("initial_info", "hci_packet", "packet")
INITIAL_INFO_FIELD_NUMBER: _ClassVar[int]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]
hci_packet: _hci_packet_pb2.HCIPacket
initial_info: _startup_pb2.ChipInfo
hci_packet: _hci_packet_pb2.HCIPacket
packet: bytes
def __init__(self, initial_info: _Optional[_Union[_startup_pb2.ChipInfo, _Mapping]] = ..., hci_packet: _Optional[_Union[_hci_packet_pb2.HCIPacket, _Mapping]] = ..., packet: _Optional[bytes] = ...) -> None: ...
class PacketResponse(_message.Message):
__slots__ = ["error", "hci_packet", "packet"]
__slots__ = ("error", "hci_packet", "packet")
ERROR_FIELD_NUMBER: _ClassVar[int]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]

View File

@@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import packet_streamer_pb2 as packet__streamer__pb2
from bumble.transport.grpc_protobuf.netsim import packet_streamer_pb2 as netsim_dot_packet__streamer__pb2
class PacketStreamerStub(object):
@@ -30,8 +30,8 @@ class PacketStreamerStub(object):
"""
self.StreamPackets = channel.stream_stream(
'/netsim.packet.PacketStreamer/StreamPackets',
request_serializer=packet__streamer__pb2.PacketRequest.SerializeToString,
response_deserializer=packet__streamer__pb2.PacketResponse.FromString,
request_serializer=netsim_dot_packet__streamer__pb2.PacketRequest.SerializeToString,
response_deserializer=netsim_dot_packet__streamer__pb2.PacketResponse.FromString,
)
@@ -64,8 +64,8 @@ def add_PacketStreamerServicer_to_server(servicer, server):
rpc_method_handlers = {
'StreamPackets': grpc.stream_stream_rpc_method_handler(
servicer.StreamPackets,
request_deserializer=packet__streamer__pb2.PacketRequest.FromString,
response_serializer=packet__streamer__pb2.PacketResponse.SerializeToString,
request_deserializer=netsim_dot_packet__streamer__pb2.PacketRequest.FromString,
response_serializer=netsim_dot_packet__streamer__pb2.PacketResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
@@ -103,7 +103,7 @@ class PacketStreamer(object):
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/netsim.packet.PacketStreamer/StreamPackets',
packet__streamer__pb2.PacketRequest.SerializeToString,
packet__streamer__pb2.PacketResponse.FromString,
netsim_dot_packet__streamer__pb2.PacketRequest.SerializeToString,
netsim_dot_packet__streamer__pb2.PacketResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: netsim/startup.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from bumble.transport.grpc_protobuf.netsim import common_pb2 as netsim_dot_common__pb2
from bumble.transport.grpc_protobuf.netsim import model_pb2 as netsim_dot_model__pb2
from bumble.transport.grpc_protobuf.rootcanal import configuration_pb2 as rootcanal_dot_configuration__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14netsim/startup.proto\x12\x0enetsim.startup\x1a\x13netsim/common.proto\x1a\x12netsim/model.proto\x1a\x1drootcanal/configuration.proto\"\xb4\x01\n\x0bStartupInfo\x12\x33\n\x07\x64\x65vices\x18\x01 \x03(\x0b\x32\".netsim.startup.StartupInfo.Device\x1ap\n\x06\x44\x65vice\x12\x10\n\x04name\x18\x01 \x01(\tB\x02\x18\x01\x12#\n\x05\x63hips\x18\x02 \x03(\x0b\x32\x14.netsim.startup.Chip\x12/\n\x0b\x64\x65vice_info\x18\x03 \x01(\x0b\x32\x1a.netsim.startup.DeviceInfo\"q\n\x08\x43hipInfo\x12\x10\n\x04name\x18\x01 \x01(\tB\x02\x18\x01\x12\"\n\x04\x63hip\x18\x02 \x01(\x0b\x32\x14.netsim.startup.Chip\x12/\n\x0b\x64\x65vice_info\x18\x03 \x01(\x0b\x32\x1a.netsim.startup.DeviceInfo\"\x7f\n\nDeviceInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04kind\x18\x02 \x01(\t\x12\x0f\n\x07version\x18\x03 \x01(\t\x12\x13\n\x0bsdk_version\x18\x04 \x01(\t\x12\x10\n\x08\x62uild_id\x18\x05 \x01(\t\x12\x0f\n\x07variant\x18\x06 \x01(\t\x12\x0c\n\x04\x61rch\x18\x07 \x01(\t\"\x9b\x02\n\x04\x43hip\x12%\n\x04kind\x18\x01 \x01(\x0e\x32\x17.netsim.common.ChipKind\x12\n\n\x02id\x18\x02 \x01(\t\x12\x14\n\x0cmanufacturer\x18\x03 \x01(\t\x12\x14\n\x0cproduct_name\x18\x04 \x01(\t\x12\r\n\x05\x66\x64_in\x18\x05 \x01(\x05\x12\x0e\n\x06\x66\x64_out\x18\x06 \x01(\x05\x12\x10\n\x08loopback\x18\x07 \x01(\x08\x12:\n\rbt_properties\x18\x08 \x01(\x0b\x32#.rootcanal.configuration.Controller\x12\x0f\n\x07\x61\x64\x64ress\x18\t \x01(\t\x12+\n\x06offset\x18\n \x01(\x0b\x32\x16.netsim.model.PositionH\x00\x88\x01\x01\x42\t\n\x07_offsetb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.startup_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_STARTUPINFO_DEVICE'].fields_by_name['name']._options = None
_globals['_STARTUPINFO_DEVICE'].fields_by_name['name']._serialized_options = b'\030\001'
_globals['_CHIPINFO'].fields_by_name['name']._options = None
_globals['_CHIPINFO'].fields_by_name['name']._serialized_options = b'\030\001'
_globals['_STARTUPINFO']._serialized_start=113
_globals['_STARTUPINFO']._serialized_end=293
_globals['_STARTUPINFO_DEVICE']._serialized_start=181
_globals['_STARTUPINFO_DEVICE']._serialized_end=293
_globals['_CHIPINFO']._serialized_start=295
_globals['_CHIPINFO']._serialized_end=408
_globals['_DEVICEINFO']._serialized_start=410
_globals['_DEVICEINFO']._serialized_end=537
_globals['_CHIP']._serialized_start=540
_globals['_CHIP']._serialized_end=823
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,76 @@
from bumble.transport.grpc_protobuf.netsim import common_pb2 as _common_pb2
from bumble.transport.grpc_protobuf.netsim import model_pb2 as _model_pb2
from bumble.transport.grpc_protobuf.rootcanal import configuration_pb2 as _configuration_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class StartupInfo(_message.Message):
__slots__ = ("devices",)
class Device(_message.Message):
__slots__ = ("name", "chips", "device_info")
NAME_FIELD_NUMBER: _ClassVar[int]
CHIPS_FIELD_NUMBER: _ClassVar[int]
DEVICE_INFO_FIELD_NUMBER: _ClassVar[int]
name: str
chips: _containers.RepeatedCompositeFieldContainer[Chip]
device_info: DeviceInfo
def __init__(self, name: _Optional[str] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ..., device_info: _Optional[_Union[DeviceInfo, _Mapping]] = ...) -> None: ...
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[StartupInfo.Device]
def __init__(self, devices: _Optional[_Iterable[_Union[StartupInfo.Device, _Mapping]]] = ...) -> None: ...
class ChipInfo(_message.Message):
__slots__ = ("name", "chip", "device_info")
NAME_FIELD_NUMBER: _ClassVar[int]
CHIP_FIELD_NUMBER: _ClassVar[int]
DEVICE_INFO_FIELD_NUMBER: _ClassVar[int]
name: str
chip: Chip
device_info: DeviceInfo
def __init__(self, name: _Optional[str] = ..., chip: _Optional[_Union[Chip, _Mapping]] = ..., device_info: _Optional[_Union[DeviceInfo, _Mapping]] = ...) -> None: ...
class DeviceInfo(_message.Message):
__slots__ = ("name", "kind", "version", "sdk_version", "build_id", "variant", "arch")
NAME_FIELD_NUMBER: _ClassVar[int]
KIND_FIELD_NUMBER: _ClassVar[int]
VERSION_FIELD_NUMBER: _ClassVar[int]
SDK_VERSION_FIELD_NUMBER: _ClassVar[int]
BUILD_ID_FIELD_NUMBER: _ClassVar[int]
VARIANT_FIELD_NUMBER: _ClassVar[int]
ARCH_FIELD_NUMBER: _ClassVar[int]
name: str
kind: str
version: str
sdk_version: str
build_id: str
variant: str
arch: str
def __init__(self, name: _Optional[str] = ..., kind: _Optional[str] = ..., version: _Optional[str] = ..., sdk_version: _Optional[str] = ..., build_id: _Optional[str] = ..., variant: _Optional[str] = ..., arch: _Optional[str] = ...) -> None: ...
class Chip(_message.Message):
__slots__ = ("kind", "id", "manufacturer", "product_name", "fd_in", "fd_out", "loopback", "bt_properties", "address", "offset")
KIND_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
FD_IN_FIELD_NUMBER: _ClassVar[int]
FD_OUT_FIELD_NUMBER: _ClassVar[int]
LOOPBACK_FIELD_NUMBER: _ClassVar[int]
BT_PROPERTIES_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
OFFSET_FIELD_NUMBER: _ClassVar[int]
kind: _common_pb2.ChipKind
id: str
manufacturer: str
product_name: str
fd_in: int
fd_out: int
loopback: bool
bt_properties: _configuration_pb2.Controller
address: str
offset: _model_pb2.Position
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., fd_in: _Optional[int] = ..., fd_out: _Optional[int] = ..., loopback: bool = ..., bt_properties: _Optional[_Union[_configuration_pb2.Controller, _Mapping]] = ..., address: _Optional[str] = ..., offset: _Optional[_Union[_model_pb2.Position, _Mapping]] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: packet_streamer.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import hci_packet_pb2 as hci__packet__pb2
from . import startup_pb2 as startup__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15packet_streamer.proto\x12\rnetsim.packet\x1a\x10hci_packet.proto\x1a\rstartup.proto\"\x93\x01\n\rPacketRequest\x12\x30\n\x0cinitial_info\x18\x01 \x01(\x0b\x32\x18.netsim.startup.ChipInfoH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0e\n\x0crequest_type\"t\n\x0ePacketResponse\x12\x0f\n\x05\x65rror\x18\x01 \x01(\tH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0f\n\rresponse_type2b\n\x0ePacketStreamer\x12P\n\rStreamPackets\x12\x1c.netsim.packet.PacketRequest\x1a\x1d.netsim.packet.PacketResponse(\x01\x30\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'packet_streamer_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_PACKETREQUEST._serialized_start=74
_PACKETREQUEST._serialized_end=221
_PACKETRESPONSE._serialized_start=223
_PACKETRESPONSE._serialized_end=339
_PACKETSTREAMER._serialized_start=341
_PACKETSTREAMER._serialized_end=439
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: rootcanal/configuration.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1drootcanal/configuration.proto\x12\x17rootcanal.configuration\"\xbc\x01\n\x12\x43ontrollerFeatures\x12\x1f\n\x17le_extended_advertising\x18\x01 \x01(\x08\x12\x1f\n\x17le_periodic_advertising\x18\x02 \x01(\x08\x12\x12\n\nll_privacy\x18\x03 \x01(\x08\x12\x11\n\tle_2m_phy\x18\x04 \x01(\x08\x12\x14\n\x0cle_coded_phy\x18\x05 \x01(\x08\x12\'\n\x1fle_connected_isochronous_stream\x18\x06 \x01(\x08\"\x8d\x01\n\x10\x43ontrollerQuirks\x12\x30\n(send_acl_data_before_connection_complete\x18\x01 \x01(\x08\x12\"\n\x1ahas_default_random_address\x18\x02 \x01(\x08\x12#\n\x1bhardware_error_before_reset\x18\x03 \x01(\x08\".\n\x0eVendorFeatures\x12\x0b\n\x03\x63sr\x18\x01 \x01(\x08\x12\x0f\n\x07\x61ndroid\x18\x02 \x01(\x08\"\x8a\x02\n\nController\x12\x39\n\x06preset\x18\x01 \x01(\x0e\x32).rootcanal.configuration.ControllerPreset\x12=\n\x08\x66\x65\x61tures\x18\x02 \x01(\x0b\x32+.rootcanal.configuration.ControllerFeatures\x12\x39\n\x06quirks\x18\x03 \x01(\x0b\x32).rootcanal.configuration.ControllerQuirks\x12\x0e\n\x06strict\x18\x04 \x01(\x08\x12\x37\n\x06vendor\x18\x05 \x01(\x0b\x32\'.rootcanal.configuration.VendorFeatures\"Y\n\tTcpServer\x12\x10\n\x08tcp_port\x18\x01 \x02(\x05\x12:\n\rconfiguration\x18\x02 \x01(\x0b\x32#.rootcanal.configuration.Controller\"G\n\rConfiguration\x12\x36\n\ntcp_server\x18\x01 \x03(\x0b\x32\".rootcanal.configuration.TcpServer*H\n\x10\x43ontrollerPreset\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x0f\n\x0bLAIRD_BL654\x10\x01\x12\x16\n\x12\x43SR_RCK_PTS_DONGLE\x10\x02\x42\x02H\x02')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rootcanal.configuration_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'H\002'
_globals['_CONTROLLERPRESET']._serialized_start=874
_globals['_CONTROLLERPRESET']._serialized_end=946
_globals['_CONTROLLERFEATURES']._serialized_start=59
_globals['_CONTROLLERFEATURES']._serialized_end=247
_globals['_CONTROLLERQUIRKS']._serialized_start=250
_globals['_CONTROLLERQUIRKS']._serialized_end=391
_globals['_VENDORFEATURES']._serialized_start=393
_globals['_VENDORFEATURES']._serialized_end=439
_globals['_CONTROLLER']._serialized_start=442
_globals['_CONTROLLER']._serialized_end=708
_globals['_TCPSERVER']._serialized_start=710
_globals['_TCPSERVER']._serialized_end=799
_globals['_CONFIGURATION']._serialized_start=801
_globals['_CONFIGURATION']._serialized_end=872
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,78 @@
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class ControllerPreset(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
DEFAULT: _ClassVar[ControllerPreset]
LAIRD_BL654: _ClassVar[ControllerPreset]
CSR_RCK_PTS_DONGLE: _ClassVar[ControllerPreset]
DEFAULT: ControllerPreset
LAIRD_BL654: ControllerPreset
CSR_RCK_PTS_DONGLE: ControllerPreset
class ControllerFeatures(_message.Message):
__slots__ = ("le_extended_advertising", "le_periodic_advertising", "ll_privacy", "le_2m_phy", "le_coded_phy", "le_connected_isochronous_stream")
LE_EXTENDED_ADVERTISING_FIELD_NUMBER: _ClassVar[int]
LE_PERIODIC_ADVERTISING_FIELD_NUMBER: _ClassVar[int]
LL_PRIVACY_FIELD_NUMBER: _ClassVar[int]
LE_2M_PHY_FIELD_NUMBER: _ClassVar[int]
LE_CODED_PHY_FIELD_NUMBER: _ClassVar[int]
LE_CONNECTED_ISOCHRONOUS_STREAM_FIELD_NUMBER: _ClassVar[int]
le_extended_advertising: bool
le_periodic_advertising: bool
ll_privacy: bool
le_2m_phy: bool
le_coded_phy: bool
le_connected_isochronous_stream: bool
def __init__(self, le_extended_advertising: bool = ..., le_periodic_advertising: bool = ..., ll_privacy: bool = ..., le_2m_phy: bool = ..., le_coded_phy: bool = ..., le_connected_isochronous_stream: bool = ...) -> None: ...
class ControllerQuirks(_message.Message):
__slots__ = ("send_acl_data_before_connection_complete", "has_default_random_address", "hardware_error_before_reset")
SEND_ACL_DATA_BEFORE_CONNECTION_COMPLETE_FIELD_NUMBER: _ClassVar[int]
HAS_DEFAULT_RANDOM_ADDRESS_FIELD_NUMBER: _ClassVar[int]
HARDWARE_ERROR_BEFORE_RESET_FIELD_NUMBER: _ClassVar[int]
send_acl_data_before_connection_complete: bool
has_default_random_address: bool
hardware_error_before_reset: bool
def __init__(self, send_acl_data_before_connection_complete: bool = ..., has_default_random_address: bool = ..., hardware_error_before_reset: bool = ...) -> None: ...
class VendorFeatures(_message.Message):
__slots__ = ("csr", "android")
CSR_FIELD_NUMBER: _ClassVar[int]
ANDROID_FIELD_NUMBER: _ClassVar[int]
csr: bool
android: bool
def __init__(self, csr: bool = ..., android: bool = ...) -> None: ...
class Controller(_message.Message):
__slots__ = ("preset", "features", "quirks", "strict", "vendor")
PRESET_FIELD_NUMBER: _ClassVar[int]
FEATURES_FIELD_NUMBER: _ClassVar[int]
QUIRKS_FIELD_NUMBER: _ClassVar[int]
STRICT_FIELD_NUMBER: _ClassVar[int]
VENDOR_FIELD_NUMBER: _ClassVar[int]
preset: ControllerPreset
features: ControllerFeatures
quirks: ControllerQuirks
strict: bool
vendor: VendorFeatures
def __init__(self, preset: _Optional[_Union[ControllerPreset, str]] = ..., features: _Optional[_Union[ControllerFeatures, _Mapping]] = ..., quirks: _Optional[_Union[ControllerQuirks, _Mapping]] = ..., strict: bool = ..., vendor: _Optional[_Union[VendorFeatures, _Mapping]] = ...) -> None: ...
class TcpServer(_message.Message):
__slots__ = ("tcp_port", "configuration")
TCP_PORT_FIELD_NUMBER: _ClassVar[int]
CONFIGURATION_FIELD_NUMBER: _ClassVar[int]
tcp_port: int
configuration: Controller
def __init__(self, tcp_port: _Optional[int] = ..., configuration: _Optional[_Union[Controller, _Mapping]] = ...) -> None: ...
class Configuration(_message.Message):
__slots__ = ("tcp_server",)
TCP_SERVER_FIELD_NUMBER: _ClassVar[int]
tcp_server: _containers.RepeatedCompositeFieldContainer[TcpServer]
def __init__(self, tcp_server: _Optional[_Iterable[_Union[TcpServer, _Mapping]]] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: startup.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import common_pb2 as common__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rstartup.proto\x12\x0enetsim.startup\x1a\x0c\x63ommon.proto\"\x7f\n\x0bStartupInfo\x12\x33\n\x07\x64\x65vices\x18\x01 \x03(\x0b\x32\".netsim.startup.StartupInfo.Device\x1a;\n\x06\x44\x65vice\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x05\x63hips\x18\x02 \x03(\x0b\x32\x14.netsim.startup.Chip\"<\n\x08\x43hipInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\"\n\x04\x63hip\x18\x02 \x01(\x0b\x32\x14.netsim.startup.Chip\"\x96\x01\n\x04\x43hip\x12%\n\x04kind\x18\x01 \x01(\x0e\x32\x17.netsim.common.ChipKind\x12\n\n\x02id\x18\x02 \x01(\t\x12\x14\n\x0cmanufacturer\x18\x03 \x01(\t\x12\x14\n\x0cproduct_name\x18\x04 \x01(\t\x12\r\n\x05\x66\x64_in\x18\x05 \x01(\x05\x12\x0e\n\x06\x66\x64_out\x18\x06 \x01(\x05\x12\x10\n\x08loopback\x18\x07 \x01(\x08\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'startup_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_STARTUPINFO._serialized_start=47
_STARTUPINFO._serialized_end=174
_STARTUPINFO_DEVICE._serialized_start=115
_STARTUPINFO_DEVICE._serialized_end=174
_CHIPINFO._serialized_start=176
_CHIPINFO._serialized_end=236
_CHIP._serialized_start=239
_CHIP._serialized_end=389
# @@protoc_insertion_point(module_scope)

View File

@@ -1,46 +0,0 @@
from . import common_pb2 as _common_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class Chip(_message.Message):
__slots__ = ["fd_in", "fd_out", "id", "kind", "loopback", "manufacturer", "product_name"]
FD_IN_FIELD_NUMBER: _ClassVar[int]
FD_OUT_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
KIND_FIELD_NUMBER: _ClassVar[int]
LOOPBACK_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
fd_in: int
fd_out: int
id: str
kind: _common_pb2.ChipKind
loopback: bool
manufacturer: str
product_name: str
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., fd_in: _Optional[int] = ..., fd_out: _Optional[int] = ..., loopback: bool = ...) -> None: ...
class ChipInfo(_message.Message):
__slots__ = ["chip", "name"]
CHIP_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
chip: Chip
name: str
def __init__(self, name: _Optional[str] = ..., chip: _Optional[_Union[Chip, _Mapping]] = ...) -> None: ...
class StartupInfo(_message.Message):
__slots__ = ["devices"]
class Device(_message.Message):
__slots__ = ["chips", "name"]
CHIPS_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
chips: _containers.RepeatedCompositeFieldContainer[Chip]
name: str
def __init__(self, name: _Optional[str] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ...) -> None: ...
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[StartupInfo.Device]
def __init__(self, devices: _Optional[_Iterable[_Union[StartupInfo.Device, _Mapping]]] = ...) -> None: ...

View File

@@ -1,14 +1,23 @@
# Invoke this script with an argument pointing to where the AOSP `tools/netsim/src/proto` is
# Invoke this script with two arguments:
# Arg 1: directory path where the AOSP `tools/netsim/proto` directory is located
# Arg 2: directory path where the RootCanal `proto/rootcanal` directory is located
PROTOC_OUT=bumble/transport/grpc_protobuf
proto_files=(common.proto packet_streamer.proto hci_packet.proto startup.proto)
for proto_file in "${proto_files[@]}"
netsim_proto_files=(netsim/common.proto netsim/packet_streamer.proto netsim/hci_packet.proto netsim/startup.proto netsim/model.proto)
for proto_file in "${netsim_proto_files[@]}"
do
python -m grpc_tools.protoc -I$1 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $1/$proto_file
python -m grpc_tools.protoc -I$1 -I$2 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $1/$proto_file
done
python_files=(packet_streamer_pb2_grpc.py packet_streamer_pb2.py hci_packet_pb2.py startup_pb2.py)
rootcanal_proto_files=(rootcanal/configuration.proto)
for proto_file in "${rootcanal_proto_files[@]}"
do
python -m grpc_tools.protoc -I$1 -I$2 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $2/$proto_file
done
python_files=(netsim/*.py netsim/*.pyi)
for python_file in "${python_files[@]}"
do
sed -i 's/^import .*_pb2 as/from . \0/' $PROTOC_OUT/$python_file
sed -i '' 's/^from netsim/from bumble.transport.grpc_protobuf.netsim/' $PROTOC_OUT/$python_file
sed -i '' 's/^from rootcanal/from bumble.transport.grpc_protobuf.rootcanal/' $PROTOC_OUT/$python_file
done