How does this NIO read work under the hood?

277 Views Asked by At

I am new to reactive programming and I am reading Reactive Spring by Josh Long.

In Chapter 4 he introduces a small program to show how a non-blocking file read actually works.

package rsb.io;
import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@Log4j2
class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> {
  private final ExecutorService executorService = Executors.newFixedThreadPool(10);
  private int bytesRead;
  private long position;
  private AsynchronousFileChannel fileChannel;
  private Consumer<Bytes> consumer;
  private Runnable finished;
  public void read(File file, Consumer<Bytes> c, Runnable finished) throws IOException
{
  this.consumer = c;
  this.finished = finished;
  Path path = file.toPath(); 
  this.fileChannel = AsynchronousFileChannel.open(path,
  Collections.singleton(StandardOpenOption.READ), this.executorService); 
  ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE);
  this.fileChannel.read(buffer, position, buffer, this); 
  while (this.bytesRead > 0) {
  this.position = this.position + this.bytesRead;
  this.fileChannel.read(buffer, this.position, buffer, this);
  }
  }
  @Override
  public void completed(Integer result, ByteBuffer buffer) {
  this.bytesRead = result; 
  if (this.bytesRead < 0) {
  this.finished.run();
  return;
  }
  buffer.flip();
  byte[] data = new byte[buffer.limit()];
  buffer.get(data);
  
  consumer.accept(Bytes.from(data, data.length));
  buffer.clear();
  this.position = this.position + this.bytesRead;
  this.fileChannel.read(buffer, this.position, buffer, this);
  }
  @Override
  public void failed(Throwable exc, ByteBuffer attachment) {
  log.error(exc);
  }
}

I have a bunch of questions about how this code actually is working under the hood:

  1. Why do we need a separate thread pool ? (Executors.newFixedThreadPool(10))
  2. Is the reading IO operation actually happening on a thread from the thread pool? If so then what is the difference between me spinning up a thread and doing the read there? If non-blocking is just using a different thread then why do I need NIO or non-blocking APIs for that?
  3. If the thread that does the actual read is waiting for the data from disk then what will happen to that thread? Will it stay blocked?
  4. Again if the thread is not blocked then why not just have one thread to do all the work? Why spin up multiple threads?

I know these questions must be very basic but I am yet to get a concrete answer for any of them.

0

There are 0 best solutions below