diff --git a/bumble/transport/android_emulator.py b/bumble/transport/android_emulator.py index 8621f562..c941c193 100644 --- a/bumble/transport/android_emulator.py +++ b/bumble/transport/android_emulator.py @@ -97,10 +97,14 @@ async def open_android_emulator_transport(spec: Optional[str]) -> Transport: raise ValueError('invalid mode') # Create the transport object - transport = PumpedTransport( + class EmulatorTransport(PumpedTransport): + async def close(self): + await super().close() + await channel.close() + + transport = EmulatorTransport( PumpedPacketSource(hci_device.read), - PumpedPacketSink(hci_device.write), - channel.close, + PumpedPacketSink(hci_device.write) ) transport.start() diff --git a/bumble/transport/android_netsim.py b/bumble/transport/android_netsim.py index 55f2502e..283225ea 100644 --- a/bumble/transport/android_netsim.py +++ b/bumble/transport/android_netsim.py @@ -74,14 +74,20 @@ def get_ini_dir() -> Optional[pathlib.Path]: # ----------------------------------------------------------------------------- -def find_grpc_port() -> int: +def ini_file_name(instance_number: int) -> str: + suffix = f'_{instance_number}' if instance_number > 0 else '' + return f'netsim{suffix}.ini' + + +# ----------------------------------------------------------------------------- +def find_grpc_port(instance_number: int) -> int: if not (ini_dir := get_ini_dir()): logger.debug('no known directory for .ini file') return 0 - ini_file = ini_dir / 'netsim.ini' + ini_file = ini_dir / ini_file_name(instance_number) + logger.debug(f'Looking for .ini file at {ini_file}') if ini_file.is_file(): - logger.debug(f'Found .ini file at {ini_file}') with open(ini_file, 'r') as ini_file_data: for line in ini_file_data.readlines(): if '=' in line: @@ -90,12 +96,14 @@ def find_grpc_port() -> int: logger.debug(f'gRPC port = {value}') return int(value) + logger.debug('no grpc.port property found in .ini file') + # Not found return 0 # ----------------------------------------------------------------------------- -def publish_grpc_port(grpc_port) -> bool: +def publish_grpc_port(grpc_port: int, instance_number: int = 0) -> bool: if not (ini_dir := get_ini_dir()): logger.debug('no known directory for .ini file') return False @@ -104,7 +112,7 @@ def publish_grpc_port(grpc_port) -> bool: logger.debug('ini directory does not exist') return False - ini_file = ini_dir / 'netsim.ini' + ini_file = ini_dir / ini_file_name(instance_number) try: ini_file.write_text(f'grpc.port={grpc_port}\n') logger.debug(f"published gRPC port at {ini_file}") @@ -129,7 +137,8 @@ async def open_android_netsim_controller_transport( if server_host == '_' or not server_host: server_host = 'localhost' - if not publish_grpc_port(server_port): + instance_number = int(options.get('instance', "0")) + if not publish_grpc_port(server_port, instance_number): logger.warning("unable to publish gRPC port") class HciDevice: @@ -186,15 +195,12 @@ async def open_android_netsim_controller_transport( logger.debug(f'<<< PACKET: {data.hex()}') self.on_data_received(data) - def send_packet(self, data): - async def send(): - await self.context.write( - PacketResponse( - hci_packet=HCIPacket(packet_type=data[0], packet=data[1:]) - ) + async def send_packet(self, data): + return await self.context.write( + PacketResponse( + hci_packet=HCIPacket(packet_type=data[0], packet=data[1:]) ) - - self.loop.create_task(send()) + ) def terminate(self): self.task.cancel() @@ -228,17 +234,17 @@ async def open_android_netsim_controller_transport( logger.debug('gRPC server cancelled') await self.grpc_server.stop(None) - def on_packet(self, packet): + async def send_packet(self, packet): if not self.device: logger.debug('no device, dropping packet') return - self.device.send_packet(packet) + return await self.device.send_packet(packet) async def StreamPackets(self, _request_iterator, context): logger.debug('StreamPackets request') - # Check that we won't already have a device + # Check that we don't already have a device if self.device: logger.debug('busy, already serving a device') return PacketResponse(error='Busy') @@ -261,11 +267,9 @@ async def open_android_netsim_controller_transport( await server.start() asyncio.get_running_loop().create_task(server.serve()) - class GrpcServerTransport(Transport): - async def close(self): - await super().close() - - return GrpcServerTransport(server, server) + sink = PumpedPacketSink(server.send_packet) + sink.start() + return Transport(server, sink) # ----------------------------------------------------------------------------- @@ -308,6 +312,7 @@ async def open_android_netsim_host_transport(server_host, server_port, options): name = options.get('name', DEFAULT_NAME) manufacturer = DEFAULT_MANUFACTURER + instance_number = int(options.get('instance', "0")) if server_host == '_' or not server_host: server_host = 'localhost' @@ -315,7 +320,7 @@ async def open_android_netsim_host_transport(server_host, server_port, options): if not server_port: # Look for the gRPC config in a .ini file server_host = 'localhost' - server_port = find_grpc_port() + server_port = find_grpc_port(instance_number) if not server_port: raise RuntimeError('gRPC server port not found') @@ -334,10 +339,14 @@ async def open_android_netsim_host_transport(server_host, server_port, options): await hci_device.start() # Create the transport object - transport = PumpedTransport( + class GrpcTransport(PumpedTransport): + async def close(self): + await super().close() + await channel.close() + + transport = GrpcTransport( PumpedPacketSource(hci_device.read), PumpedPacketSink(hci_device.write), - channel.close, ) transport.start() @@ -359,6 +368,11 @@ async def open_android_netsim_transport(spec): to connect *to* a netsim server (netsim is the controller), or accept connections *as* a netsim-compatible server. + instance= + Specifies an instance number, with > 0. This is used to determine which + .init file to use. In `host` mode, it is ignored when the : + specifier is present, since in that case no .ini file is used. + In `host` mode: The : part is optional. When not specified, the transport looks for a netsim .ini file, from which it will read the `grpc.backend.port` diff --git a/bumble/transport/common.py b/bumble/transport/common.py index c0303086..2786a75e 100644 --- a/bumble/transport/common.py +++ b/bumble/transport/common.py @@ -339,8 +339,9 @@ class PumpedPacketSource(ParserSource): try: packet = await self.receive_function() self.parser.feed_data(packet) - except asyncio.exceptions.CancelledError: + except asyncio.CancelledError: logger.debug('source pump task done') + self.terminated.set_result(None) break except Exception as error: logger.warning(f'exception while waiting for packet: {error}') @@ -370,7 +371,7 @@ class PumpedPacketSink: try: packet = await self.packet_queue.get() await self.send_function(packet) - except asyncio.exceptions.CancelledError: + except asyncio.CancelledError: logger.debug('sink pump task done') break except Exception as error: @@ -393,19 +394,13 @@ class PumpedTransport(Transport): self, source: PumpedPacketSource, sink: PumpedPacketSink, - close_function, ) -> None: super().__init__(source, sink) - self.close_function = close_function def start(self) -> None: self.source.start() self.sink.start() - async def close(self) -> None: - await super().close() - await self.close_function() - # ----------------------------------------------------------------------------- class SnoopingTransport(Transport): diff --git a/bumble/transport/ws_client.py b/bumble/transport/ws_client.py index b8946ee2..902001e4 100644 --- a/bumble/transport/ws_client.py +++ b/bumble/transport/ws_client.py @@ -38,10 +38,14 @@ async def open_ws_client_transport(spec: str) -> Transport: websocket = await websockets.client.connect(spec) - transport = PumpedTransport( + class WsTransport(PumpedTransport): + async def close(self): + await super().close() + await websocket.close() + + transport = WsTransport( PumpedPacketSource(websocket.recv), PumpedPacketSink(websocket.send), - websocket.close, ) transport.start() return transport