Merge pull request #833 from google/gbg/netsim_enhancements

android-netsim transport enhancements
This commit is contained in:
Gilles Boccon-Gibod
2025-12-15 15:38:20 -08:00
committed by GitHub

View File

@@ -156,21 +156,26 @@ async def open_android_netsim_controller_transport(
logger.warning("unable to publish gRPC port")
class HciDevice:
def __init__(self, context, on_data_received):
def __init__(self, context, server):
self.context = context
self.on_data_received = on_data_received
self.server = server
self.name = None
self.sink = None
self.loop = asyncio.get_running_loop()
self.done = self.loop.create_future()
self.task = self.loop.create_task(self.pump())
async def pump(self):
try:
await self.pump_loop()
except asyncio.CancelledError:
logger.debug('Pump task canceled')
if not self.done.done():
self.done.set_result(None)
finally:
if self.sink:
logger.debug('Releasing sink')
self.server.release_sink()
self.sink = None
logger.debug('Pump task terminated')
async def pump_loop(self):
while True:
@@ -186,15 +191,26 @@ async def open_android_netsim_controller_transport(
if request.WhichOneof('request_type') == 'initial_info':
logger.debug(f'Received initial info: {request}')
self.name = request.initial_info.name
# We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type')
error = PacketResponse(error='Unsupported chip type')
await self.context.write(error)
return
# return
continue
self.name = request.initial_info.name
continue
# Lease the sink so that no other device can send
self.sink = self.server.lease_sink(self)
if self.sink is None:
logger.warning('Another device is already connected')
error = PacketResponse(error='Device busy')
await self.context.write(error)
# return
continue
continue
# Expect a data packet
request_type = request.WhichOneof('request_type')
@@ -205,10 +221,10 @@ async def open_android_netsim_controller_transport(
continue
# Process the packet
data = (
assert self.sink is not None
self.sink(
bytes([request.hci_packet.packet_type]) + request.hci_packet.packet
)
self.on_data_received(data)
async def send_packet(self, data):
return await self.context.write(
@@ -217,12 +233,6 @@ async def open_android_netsim_controller_transport(
)
)
def terminate(self):
self.task.cancel()
async def wait_for_termination(self):
await self.done
server_address = f'{server_host}:{server_port}'
class Server(PacketStreamerServicer, ParserSource):
@@ -258,27 +268,27 @@ async def open_android_netsim_controller_transport(
return await self.device.send_packet(packet)
async def StreamPackets(self, _request_iterator, context):
def lease_sink(self, device):
if self.device:
return None
self.device = device
return self.parser.feed_data
def release_sink(self):
self.device = None
async def StreamPackets(self, request_iterator, context):
logger.debug('StreamPackets request')
# Check that we don't already have a device
if self.device:
logger.debug('Busy, already serving a device')
return PacketResponse(error='Busy')
# Instantiate a new device
self.device = HciDevice(context, self.parser.feed_data)
device = HciDevice(context, self)
# Wait for the device to terminate
logger.debug('Waiting for device to terminate')
# Pump packets to/from the device
logger.debug('Pumping device packets')
try:
await self.device.wait_for_termination()
except asyncio.CancelledError:
logger.debug('Request canceled')
self.device.terminate()
logger.debug('Device terminated')
self.device = None
await device.pump()
finally:
logger.debug('Pump terminated')
server = Server()
await server.start()