android-netsim transport enhancements

This commit is contained in:
Gilles Boccon-Gibod
2025-12-13 16:27:04 -08:00
parent 6649464cd6
commit 302e496890

View File

@@ -156,21 +156,26 @@ async def open_android_netsim_controller_transport(
logger.warning("unable to publish gRPC port") logger.warning("unable to publish gRPC port")
class HciDevice: class HciDevice:
def __init__(self, context, on_data_received): def __init__(self, context, server):
self.context = context self.context = context
self.on_data_received = on_data_received self.server = server
self.name = None self.name = None
self.sink = None
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.done = self.loop.create_future() self.done = self.loop.create_future()
self.task = self.loop.create_task(self.pump())
async def pump(self): async def pump(self):
try: try:
await self.pump_loop() await self.pump_loop()
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug('Pump task canceled') logger.debug('Pump task canceled')
if not self.done.done(): finally:
self.done.set_result(None) if self.sink:
logger.debug('Releasing sink')
self.server.release_sink()
self.sink = None
logger.debug('Pump task terminated')
async def pump_loop(self): async def pump_loop(self):
while True: while True:
@@ -186,15 +191,26 @@ async def open_android_netsim_controller_transport(
if request.WhichOneof('request_type') == 'initial_info': if request.WhichOneof('request_type') == 'initial_info':
logger.debug(f'Received initial info: {request}') logger.debug(f'Received initial info: {request}')
self.name = request.initial_info.name
# We only accept BLUETOOTH # We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH: if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type') logger.warning('Unsupported chip type')
error = PacketResponse(error='Unsupported chip type') error = PacketResponse(error='Unsupported chip type')
await self.context.write(error) await self.context.write(error)
return # return
continue
self.name = request.initial_info.name # Lease the sink so that no other device can send
continue self.sink = self.server.lease_sink(self)
if self.sink is None:
logger.warning('Another device is already connected')
error = PacketResponse(error='Device busy')
await self.context.write(error)
# return
continue
continue
# Expect a data packet # Expect a data packet
request_type = request.WhichOneof('request_type') request_type = request.WhichOneof('request_type')
@@ -205,10 +221,10 @@ async def open_android_netsim_controller_transport(
continue continue
# Process the packet # Process the packet
data = ( assert self.sink is not None
self.sink(
bytes([request.hci_packet.packet_type]) + request.hci_packet.packet bytes([request.hci_packet.packet_type]) + request.hci_packet.packet
) )
self.on_data_received(data)
async def send_packet(self, data): async def send_packet(self, data):
return await self.context.write( return await self.context.write(
@@ -217,12 +233,6 @@ async def open_android_netsim_controller_transport(
) )
) )
def terminate(self):
self.task.cancel()
async def wait_for_termination(self):
await self.done
server_address = f'{server_host}:{server_port}' server_address = f'{server_host}:{server_port}'
class Server(PacketStreamerServicer, ParserSource): class Server(PacketStreamerServicer, ParserSource):
@@ -258,27 +268,27 @@ async def open_android_netsim_controller_transport(
return await self.device.send_packet(packet) return await self.device.send_packet(packet)
async def StreamPackets(self, _request_iterator, context): def lease_sink(self, device):
if self.device:
return None
self.device = device
return self.parser.feed_data
def release_sink(self):
self.device = None
async def StreamPackets(self, request_iterator, context):
logger.debug('StreamPackets request') logger.debug('StreamPackets request')
# Check that we don't already have a device
if self.device:
logger.debug('Busy, already serving a device')
return PacketResponse(error='Busy')
# Instantiate a new device # Instantiate a new device
self.device = HciDevice(context, self.parser.feed_data) device = HciDevice(context, self)
# Wait for the device to terminate # Pump packets to/from the device
logger.debug('Waiting for device to terminate') logger.debug('Pumping device packets')
try: try:
await self.device.wait_for_termination() await device.pump()
except asyncio.CancelledError: finally:
logger.debug('Request canceled') logger.debug('Pump terminated')
self.device.terminate()
logger.debug('Device terminated')
self.device = None
server = Server() server = Server()
await server.start() await server.start()