How to program a hazelcast jet source which asynchronously receives data

229 Views Asked by At

I have been going through the hazelcast-jet documentation to find references to sources which are being fed asynchronously by some external process - which in my case would be http posts.

I did look at the Kafka code as this seems to be the closest, but can't figure out on how a newly arriving event would trigger anything. I assume, there won't be a blocking thread involved here.

I would appreciate any pointers to better understand on how I could use hazelcast jet in an environment where the 'stream' elements are being drip fed.

1

There are 1 best solutions below

0
On

The upcoming version 0.7 of Hazelcast Jet introduces the Source Builder object, which makes it simpler to build a custom source. You can use it to write code similar to this:

public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    StreamSource<String> source = SourceBuilder
            .timestampedStream("http-trickle", x -> new HttpSource())
            .fillBufferFn(HttpSource::addToBuffer)
            .destroyFn(HttpSource::destroy)
            .build();
    StreamStage<String> srcStage = pipeline.drawFrom(source);
}

private static class HttpSource {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
    private final ArrayList<String> buffer = new ArrayList<>();
    private final AsyncClient<String> client = 
        new AsyncClient<String>().addReceiveListener(queue::add);

    void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
        queue.drainTo(buffer);
        for (String line : buffer) {
            sourceBuffer.add(line, extractTimestamp(line));
        }
        buffer.clear();
    }

    void destroy() {
        client.close();
    }
}

Here I've used a mock AsyncClient that should represent your actual async HTTP client. It expects you to provide a callback that will handle incoming data when it arrives. Jet's source builder asks you for another callback, fillBufferFn, that emits the data to the processing pipeline.

Your callback to the AsyncClient should push the data to a concurrent queue and your fillBufferFn should drain the queue to Jet's source buffer.

You might be tempted to simplify the code I gave to this:

void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
    for (String line; (line = queue.poll()) != null;) {
        sourceBuffer.add(line, extractTimestamp(line));
    }
}

This avoids the intermediate buffer standing between the concurrent queue and Jet's source buffer. It would actually work most of the time, but if you ever experience a strong traffic peak, addToBufferDirect may never complete. This would violate the contract with Jet, which requires you to return from fillBufferFn within a second or so.

We already recognized that this pattern of using the source builder with async client APIs is very common and we plan to provide more convenience to handle it.