How to use sshd mina/netty with projectreactor

786 Views Asked by At

I want to develop a java reactive application which needs to communicate to some external services via ssh. As reactive framework, I am using spring boot webflux from the project reactor and sshd mina/netty for the ssh client. Basically, the application will open a ssh session and run some commands on the server. The logic with the command depends on the responses of previous commands.

The question is, how to integrate sshd mina into spring boot project reactor (Mono/Flux)?

sshd mina offers the possibility to use async response, as shown in the test: https://github.com/apache/mina-sshd/blob/c876cce935f9278b0d50f02fd554eff1af949060/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java#L560

but I don't know how to integrate that with Mono/Flux.

Until now, I am able to get the responce corresponding to the login but not the following response after sending a command.

Here is my setup code

the test ssh server is create via docker

docker run -d --rm -e SUDO_ACCESS=false -e PASSWORD_ACCESS=true -e USER_NAME=username -e USER_PASSWORD=password -p 2222:2222 lscr.io/linuxserver/openssh-server:latest

the java project contains the following dependencies

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.sshd</groupId>
      <artifactId>sshd-netty</artifactId>
      <version>2.8.0</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-test</artifactId>
      <scope>test</scope>
    </dependency>

the ssh client code which I would like it to be reactive with the mina documentation (https://github.com/apache/mina-sshd) and the only example I could find of mina async client usage (https://github.com/apache/mina-sshd/blob/master/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java#L518)

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoReadFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class SshDocker implements AutoCloseable {
  private static final String hostname = "localhost";
  private static final String username = "username";
  private static final String password = "password";
  private static final int port = 2222;
  private static final Duration clientTimeout = Duration.ofSeconds(10);
  private SshClient client;
  private ClientSession session;
  private ClientChannel channel;

  public Mono<String> open() throws IOException {
    client = SshClient.setUpDefaultClient();
    client.start();

    session = client.connect(username, hostname, port).verify(clientTimeout).getSession();
    session.addPasswordIdentity(password);
    session.auth().verify(clientTimeout);

    channel = session.createShellChannel();
    channel.setStreaming(StreamingChannel.Streaming.Async);
    channel.open().verify(clientTimeout);

    final Duration timeout = Duration.ofSeconds(10);
    return readResponse(timeout);
  }

  @Override
  public void close() throws Exception {
    channel.close();
    session.close();
    client.close();
  }

  public Mono<String> execCommand(final String command, final Duration timeout) {
    return runCommand(command, timeout).flatMap(v -> readResponse(timeout));
  }

  private Mono<Void> runCommand(final String command, final Duration timeout) {
    final IoOutputStream requestStream = channel.getAsyncIn();
    return Mono.create(
        voidMonoSink -> {
          final ReactiveMonoRequestListener reactiveMonoRequestListener =
              new ReactiveMonoRequestListener(timeout, voidMonoSink);
          try {
            requestStream
                .writeBuffer(new ByteArrayBuffer(command.getBytes()))
                .addListener(reactiveMonoRequestListener);
          } catch (final IOException e) {
            throw new RuntimeException(e);
          }
        });
  }

  private Mono<String> readResponse(final Duration timeout) {
    final IoInputStream responseStream = channel.getAsyncOut();
    return Mono.create(
        monoSink -> {
          final ReactiveMonoResponseListener reactiveResponseListener =
              new ReactiveMonoResponseListener(responseStream, timeout, monoSink);
          responseStream.read(new ByteArrayBuffer()).addListener(reactiveResponseListener);
        });
  }

  public static class ReactiveMonoResponseListener implements SshFutureListener<IoReadFuture> {

    final IoInputStream responseStream;
    final ByteArrayOutputStream result = new ByteArrayOutputStream();
    private final Duration timeout;
    private final MonoSink<String> handler;

    public ReactiveMonoResponseListener(
        final IoInputStream responseStream,
        final Duration timeout,
        final MonoSink<String> handler) {
      this.responseStream = responseStream;
      this.timeout = timeout;
      this.handler = handler;
    }

    @Override
    public void operationComplete(final IoReadFuture ioReadFuture) {
      System.out.println("Operation Read Complete");
      if (handler != null) {
        try {
          ioReadFuture.verify(timeout);
          final Buffer buffer = ioReadFuture.getBuffer();
          result.write(buffer.array(), buffer.rpos(), buffer.available());
          buffer.rpos(buffer.rpos() + buffer.available());
          buffer.compact();
          if (!result.toString().endsWith("$ ")) { // read response until next prompt
            responseStream.read(buffer).addListener(this);
          } else {
            System.out.println("response >>>>>>>>");
            System.out.println(result);
            System.out.println("<<<<<<<< response");

            handler.success(result.toString());
          }
        } catch (final IOException e) {
          handler.error(e);
        }
      }
    }
  }

  public static class ReactiveMonoRequestListener implements SshFutureListener<IoWriteFuture> {

    private final MonoSink<Void> handler;
    private final Duration timeout;

    public ReactiveMonoRequestListener(final Duration timeout, final MonoSink<Void> handler) {
      this.handler = handler;
      this.timeout = timeout;
    }

    @Override
    public void operationComplete(final IoWriteFuture ioWriteFuture) {
      System.out.println("Operation Write Complete");
      if (handler != null) {
        try {
          ioWriteFuture.verify(timeout);
          handler.success();
        } catch (final IOException e) {
          handler.error(e);
        }
      }
    }
  }
}

the test used to run the reactive ssh client

import java.time.Duration;
import org.junit.jupiter.api.Test;

class SshDockerTest {

  @Test
  void run() throws Exception {
    final SshDocker sshClient = new SshDocker();
    sshClient
        .open()
        .flatMap(v -> sshClient.execCommand("ls\n", Duration.ofSeconds(3)))
        .subscribe(System.out::println);
  }
}

when running the test, beside the debug log, I obtain:

Operation Read Complete
response >>>>>>>>
Welcome to OpenSSH Server

65d098057769:~$
<<<<<<<< response
Operation Write Complete

but no sign of the response for the ls command

if it is not possible to transform that sshd mina into reactive, what would be an alternative reactive solution?

Thanks

1

There are 1 best solutions below

0
vivi On

I finally found a working solution. The previous problem was that Mono<Void> does not trigger the subsequent tasks in the pipeline.

Here are the new changes:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoReadFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;

// start a docker container
// docker run -d --rm -e SUDO_ACCESS=false -e PASSWORD_ACCESS=true -e USER_NAME=username -e
// USER_PASSWORD=password -p 2222:2222 lscr.io/linuxserver/openssh-server:latest

public class SshDocker {
  private static final String hostname = "localhost";
  private static final String username = "username";
  private static final String password = "password";
  private static final int port = 2222;
  private static final Duration clientTimeout = Duration.ofSeconds(10);
  private IoInputStream responseStream;
  private IoOutputStream requestStream;
  private SshClient client;
  private ClientSession session;
  private ClientChannel channel;

  private Boolean openNotReactive() throws IOException {
    client = SshClient.setUpDefaultClient();
    client.start();

    session = client.connect(username, hostname, port).verify(clientTimeout).getSession();
    session.addPasswordIdentity(password);
    session.auth().verify(clientTimeout);

    channel = session.createShellChannel();
    channel.setStreaming(StreamingChannel.Streaming.Async);
    channel.open().verify(clientTimeout);

    responseStream = channel.getAsyncOut();
    requestStream = channel.getAsyncIn();
    return true;
  }

  public Mono<String> open() {
    final Duration timeout = Duration.ofSeconds(10);
    final Mono<Boolean> open =
        Mono.fromCallable(this::openNotReactive).subscribeOn(Schedulers.boundedElastic());
    return open.flatMap(r -> readResponse(timeout));
  }

  public Mono<Boolean> close() {
    return Mono.fromCallable(
            () -> {
              closeNotReactive();
              return true;
            })
        .subscribeOn(Schedulers.boundedElastic());
  }

  private void closeNotReactive() throws Exception {
    System.out.println("Closing");
    channel.close();
    session.close();
    client.close();
  }

  public Mono<String> execCommand(final String command, final Duration timeout) {
    return runCommand(command, timeout).flatMap(v -> readResponse(timeout)).log();
  }

  private Mono<Boolean> runCommand(final String command, final Duration timeout) {
    final String cmd = String.format("%s\n", command.strip());
    return Mono.create(
        monoSink -> {
          final ReactiveMonoRequestListener reactiveMonoRequestListener =
              new ReactiveMonoRequestListener(timeout, monoSink);
          try {
            final IoWriteFuture writeFuture =
                requestStream.writeBuffer(new ByteArrayBuffer(cmd.getBytes()));
            writeFuture.addListener(reactiveMonoRequestListener);
            monoSink.onDispose(() -> writeFuture.removeListener(reactiveMonoRequestListener));
          } catch (final IOException e) {
            throw new RuntimeException(e);
          }
        });
  }

  private Mono<String> readResponse(final Duration timeout) {
    // https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#create-java.util.function.Consumer-
    return Mono.create(
        monoSink -> {
          final ReactiveMonoResponseListener reactiveResponseListener =
              new ReactiveMonoResponseListener(responseStream, timeout, monoSink);
          final IoReadFuture readFuture = responseStream.read(new ByteArrayBuffer());
          readFuture.addListener(reactiveResponseListener);
          monoSink.onDispose(() -> readFuture.removeListener(reactiveResponseListener));
        });
  }

  public static class ReactiveMonoResponseListener implements SshFutureListener<IoReadFuture> {

    final IoInputStream responseStream;
    final ByteArrayOutputStream result = new ByteArrayOutputStream();
    private final Duration timeout;
    private final MonoSink<String> handler;

    public ReactiveMonoResponseListener(
        final IoInputStream responseStream,
        final Duration timeout,
        final MonoSink<String> handler) {
      this.responseStream = responseStream;
      this.timeout = timeout;
      this.handler = handler;
    }

    @Override
    public void operationComplete(final IoReadFuture ioReadFuture) {
      System.out.println("Operation Read Complete");
      if (handler != null) {
        try {
          ioReadFuture.verify(timeout);
          final Buffer buffer = ioReadFuture.getBuffer();
          result.write(buffer.array(), buffer.rpos(), buffer.available());
          buffer.rpos(buffer.rpos() + buffer.available());
          buffer.compact();
          if (!result.toString().endsWith("$ ")) { // read response until next prompt
            responseStream.read(buffer).addListener(this);
          } else {
            System.out.println("response mono >>>>>>>>");
            System.out.println(result);
            System.out.println("<<<<<<<< response mono");

            handler.success(result.toString());
          }
        } catch (final IOException e) {
          handler.error(e);
        }
      }
    }
  }

  public static class ReactiveMonoRequestListener implements SshFutureListener<IoWriteFuture> {

    private final MonoSink<Boolean> handler;
    private final Duration timeout;

    public ReactiveMonoRequestListener(final Duration timeout, final MonoSink<Boolean> handler) {
      this.handler = handler;
      this.timeout = timeout;
    }

    @Override
    public void operationComplete(final IoWriteFuture ioWriteFuture) {
      System.out.println("Operation Write Complete");
      if (handler != null) {
        try {
          ioWriteFuture.verify(timeout);
          handler.success(true);
        } catch (final IOException e) {
          handler.error(e);
        }
      }
    }
  }
}

and the "test" to run the application:

import java.time.Duration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class SshDockerTest {

  @Test
  void run() {
    final SshDocker sshClient = new SshDocker();
    final Mono<Boolean> result = sshClient
        .open()
        .flatMap(r -> sshClient.execCommand("ls", Duration.ofSeconds(3)))
        .flatMap(r -> sshClient.execCommand("pwd", Duration.ofSeconds(3)))
        .flatMap(r -> sshClient.close());
    StepVerifier.create(result).expectNext(true).verifyComplete();
  }
}

Is it possible to use AutoClosable with reactor so that I don't need to pay to much attention to manually calling the close method?

If you see any improvement, please let me know.