Expose entire IMap over REST/WebFlux without excessive Heap utilisation

335 Views Asked by At

I have a distributed Hazelcast map (IMap) which is potentially very large.

I am tasked with returning the ENTIRE map values() collection as response to an HTTP GET request.

To minimise heap utilisation I plan to use Spring WebFlux and return an instance of Flux.

My concern is that invocation of IMap#values().iterator().next(), which is implicit in Flux.fromIterable(), might deserialize ALL values from ALL cluster members, thus blowing the heap of the JVM which is a Hazelcast client servicing the GET request.

If this concern is well-founded, then:

Would Hazelcast Jet provide a solution? I could create a Pipeline.withSource(IMap), but how would I create the sink as an instance of Flux which can be returned?

Many thanks, Robin.

2

There are 2 best solutions below

1
On BEST ANSWER

This concern is valid. There actually is a query size limit (see here), for a large map the values() call will fail.

Jet isn't useful for request-response scenario: it can handle large maps in a streaming way, but it delivers the map entries to a sink and not to the caller. You could maybe hack it around, but it's not straightforward.

In the upcoming Hazelcast 4.1 there will be SQL API which would be best for your use case: if you query the map using SQL, even large results can be streamed to the client without constant memory usage.

As a workaround, you can look into the backing code of the Jet map reader: ReadMapOrCacheP.java, it uses an internal API to read the map incrementally. But it's an internal and unsupported API that can be changed/removed with each release.

1
On

As an addition to Oliv's answer, there is an internal iterator implementation introduced in IMDG 4.0 over the IMap contents that doesn't retrieve the entire map contents and should work well in face of concurrent mutation and failures.

Still, using it is a bit involved as we've not exposed it via public API yet. For the javadoc and the member-side implementation see here.

Example code:

public static void main(String[] args) {
        HazelcastInstance i1 = Hazelcast.newHazelcastInstance();
        HazelcastInstance i2 = Hazelcast.newHazelcastInstance();
        HazelcastInstance i3 = Hazelcast.newHazelcastInstance();
        IMap<Integer, Integer> sampleMap = i1.getMap("map");

        // ingest
        for (int i = 0; i < 100; i++) {
            sampleMap.put(i, i);
        }

        // read partition by partition
        for (Partition partition : i1.getPartitionService().getPartitions()) {
            int fetchSize = 10;
            boolean prefetchValues = true;

            MapProxyImpl mapImpl = (MapProxyImpl) sampleMap;
            // in case you're reading from the client, use:
            // ClientMapProxy mapImpl = (ClientMapProxy) sampleMap;
            Iterator<Entry<String, String>> partitionIterator =
                    mapImpl.iterator(fetchSize, partition.getPartitionId(), prefetchValues);

            while (partitionIterator.hasNext()) {
                Entry<String, String> next = partitionIterator.next();
                System.out.println("Fetched entry " + next);
            }
        }
    }