I am new to reactive programming and I am reading Reactive Spring by Josh Long.
In Chapter 4 he introduces a small program to show how a non-blocking file read actually works.
package rsb.io;
import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Log4j2
class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> {
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private int bytesRead;
private long position;
private AsynchronousFileChannel fileChannel;
private Consumer<Bytes> consumer;
private Runnable finished;
public void read(File file, Consumer<Bytes> c, Runnable finished) throws IOException
{
this.consumer = c;
this.finished = finished;
Path path = file.toPath();
this.fileChannel = AsynchronousFileChannel.open(path,
Collections.singleton(StandardOpenOption.READ), this.executorService);
ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE);
this.fileChannel.read(buffer, position, buffer, this);
while (this.bytesRead > 0) {
this.position = this.position + this.bytesRead;
this.fileChannel.read(buffer, this.position, buffer, this);
}
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
this.bytesRead = result;
if (this.bytesRead < 0) {
this.finished.run();
return;
}
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
consumer.accept(Bytes.from(data, data.length));
buffer.clear();
this.position = this.position + this.bytesRead;
this.fileChannel.read(buffer, this.position, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.error(exc);
}
}
I have a bunch of questions about how this code actually is working under the hood:
- Why do we need a separate thread pool ? (
Executors.newFixedThreadPool(10)
) - Is the reading IO operation actually happening on a thread from the thread pool? If so then what is the difference between me spinning up a thread and doing the read there? If non-blocking is just using a different thread then why do I need NIO or non-blocking APIs for that?
- If the thread that does the actual read is waiting for the data from disk then what will happen to that thread? Will it stay blocked?
- Again if the thread is not blocked then why not just have one thread to do all the work? Why spin up multiple threads?
I know these questions must be very basic but I am yet to get a concrete answer for any of them.