When creating a global KTable using the Processor API and the addGlobalStore function the resulting store populates OK. But subsequent attempts to iterate over the store's contents results in the following exception:
Exception in thread "main" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:63) at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:26) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:208) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:189) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:155) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:113) at org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator.hasNext(DelegatingPeekingKeyValueIterator.java:63)
This is from simple code that iterates over the store:
ReadOnlyKeyValueStore<String, StreamConfig> store = this.globalStreams.store("config-table",QueryableStoreTypes.<String, Config>keyValueStore());
final KeyValueIterator<String, Config> iteratble = store.all();
HashMap<String, Config> dynamicStreams = new HashMap<String,Config>();
while (iteratble.hasNext()) {
final KeyValue<String, Config> next = iteratble.next();
dynamicStreams.put(next.key, next.value);
}
iteratble.close();
Any access to the entries in that store causes that exception. The same topic works fine if I create the global state store using the globalTable function (so this is not a deserialization problem). The state store also returns the number of rows successfully (so it is fully populated).
The Global State store is created as follows, this is using Kafka Streams 0.10.2.1.
KStreamBuilder globalBuilder = new KStreamBuilder();
StateStoreSupplier<KeyValueStore<String, Config>> storeSupplier = Stores
.create("config-table")
.withKeys(Serdes.String())
.withValues(configSerdes)
.persistent()
.build();
// a Processor that updates the store
ProcessorSupplier<String, Config> procSupplier = () -> new ConfigWorker();
globalBuilder.addGlobalStore(
storeSupplier.get(),
"config-table-source",
new StringDeserializer(),
configDeserializer,
"config",
"config-worker",
procSupplier)
.buildGlobalStateTopology();
this.globalStreams = new KafkaStreams(
globalBuilder,
this.getProperties());
globalStreams.start();
Edit:
One problem was that the state store by default is logged and global KTables are not backed by a change log. So adding a
.disableLoggingto the state store creation fixed this.Another problem seemed to be in the call to
context.schedulein the Processor'sinitfunction. This was throwing an invalidation operation exception. Removing this fixed the code, but I guesspunctuateis now never called. It seems to work now - but it's not clear why I can't callschedule.
Seems that forwarding in a global context is not allowed, as per this code. So my global Processor just needs to process inbound updates only.