Infinispan 9 - Server exception when trying to put an object in cache using Hotrod client and Protobuf

2k Views Asked by At

I have a problem using Infinispan 9.4.0/9.4.1 with Hot Rod Client and Protobuf protocol in order to be able to use queries.

This is my cache configuration:

<cache-container name="clustered" default-cache="RequestIndexed" statistics="false">
    <transport channel="cluster" lock-timeout="1000"/>
    <global-state/>
    <modules>
        <module name="deployment.infinispan-module.jar"/>
    </modules>
    <replicated-cache name="RequestIndexedIndexLockingCache" remote-timeout="3000" statistics-available="false">
        <indexing index="NONE"/>
    </replicated-cache>
    <replicated-cache name="RequestIndexedIndexDataCache" remote-timeout="3000" statistics-available="false">
        <indexing index="NONE"/>
    </replicated-cache>
    <replicated-cache name="RequestIndexedIndexMetadataCache" remote-timeout="3000" statistics-available="false">
        <indexing index="NONE"/>
    </replicated-cache>
    <distributed-cache name="RequestIndexed" remote-timeout="3000" statistics-available="false">
        <memory>
            <object size="20" strategy="LRU"/>
        </memory>
        <compatibility enabled="true"/>
        <file-store path="system-store" passivation="true"/>
        <indexing index="PRIMARY_OWNER">
            <property name="default.indexmanager">
                org.infinispan.query.indexmanager.InfinispanIndexManager
            </property>
            <property name="default.locking_cachename">
                RequestIndexedIndexLockingCache
            </property>
            <property name="default.data_cachename">
                RequestIndexedIndexDataCache
            </property>
            <property name="default.metadata_cachename">
                RequestIndexedIndexMetadataCache
            </property>
        </indexing>
        <state-transfer timeout="60000" chunk-size="1024"/>
    </distributed-cache>
</cache-container>

My entity

public class EntityDemo implements Serializable {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    private long id;

    private String name;

    private String value;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public String getKey() {
        return String.valueOf(id);
    }
}

The proto file

package test.infinispan.entity;

option indexed_by_default = false;

message EntityDemo {
    /*@Field*/
    required int64 id = 1;
    /*@Field*/
    required string name = 2;
    optional string value = 3;
}

The marshaller

public class EntityDemoMarshaller implements Serializable, MessageMarshaller<EntityDemo> {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    @Override
    public Class<? extends EntityDemo> getJavaClass() {
        return EntityDemo.class;
    }

    @Override
    public String getTypeName() {
        return "test.infinispan.entity.EntityDemo";
    }

    @Override
    public EntityDemo readFrom(ProtoStreamReader reader) throws IOException {
        final EntityDemo ed = new EntityDemo();
        ed.setId(reader.readLong("id"));
        ed.setName(reader.readString("name"));
        ed.setValue(reader.readString("value"));
        return null;
    }

    @Override
    public void writeTo(ProtoStreamWriter writer, EntityDemo ed) throws IOException {
        writer.writeLong("id", ed.getId());
        writer.writeString("name", ed.getName());
        writer.writeString("value", ed.getValue());
    }
}

The cache configuration

<distributed-cache name="core.request" remote-timeout="3000" statistics-available="false">
    <memory>
        <object size="20" strategy="LRU"/>
    </memory>
    <file-store path="cache-store" passivation="true"/>
    <indexing index="NONE"/>
    <state-transfer timeout="60000" chunk-size="1024"/>
    <encoding>
        <key media-type="application/x-jboss-marshalling"/>
        <value media-type="application/x-jboss-marshalling"/>
    </encoding>
</distributed-cache>

My connection to Infinispan

ConfigurationBuilder cbi = new ConfigurationBuilder();
cbi.addServers("localhost:11222");
cbi.marshaller(new ProtoStreamMarshaller());
RemoteCacheManager rcmi = new RemoteCacheManager(cbi.build());

SerializationContext sc = ProtoStreamMarshaller.getSerializationContext(rcmi);
FileDescriptorSource fds = new FileDescriptorSource();
fds.addProtoFiles("/proto/EntityDemo.proto");
sc.registerProtoFiles(fds);
sc.registerMarshaller(new EntityDemoMarshaller());

RemoteCache<String, String> metadataCache = rcmi.getCache(ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME);
metadataCache.put("/proto/EntityDemo.proto", readProtoFile("/proto/EntityDemo.proto"));

RemoteCache<String, EntityDemo> rci = rcmi.getCache("RequestIndexed");
rci.clear();
EntityDemo ei = new EntityDemo();
ei.setId(1);
ei.setName("DemoIndexed");
ei.setValue("DemoIndexed");
rci.put(ei.getKey(), ei);

QueryFactory qf = Search.getQueryFactory(rci);
Query q = qf.from(EntityDemo.class)
        .having("id").gte(1)
        .build();
q.list().stream().forEach(v -> System.out.println(v));

I put my entity in a jar file (infinispan-module.jar) and then i deployed that jar in Infinispan.

When i try to put an object in the cache, i get the following exception in the server

[Server:instance-one] 13:48:17,477 ERROR [stderr] (HotRod-ServerHandler-4-10) Exception in thread "HotRod-ServerHandler-4-10" java.lang.IllegalArgumentException: No marshaller registered for test.infinispan.entity.EntityDemo
[Server:instance-one] 13:48:17,478 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.impl.SerializationContextImpl.getMarshallerDelegate(SerializationContextImpl.java:276)
[Server:instance-one] 13:48:17,479 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.WrappedMessage.readMessage(WrappedMessage.java:379)
[Server:instance-one] 13:48:17,479 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:165)
[Server:instance-one] 13:48:17,480 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:160)
[Server:instance-one] 13:48:17,480 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.query.remote.impl.dataconversion.ProtostreamObjectTranscoder.transcode(ProtostreamObjectTranscoder.java:42)
[Server:instance-one] 13:48:17,481 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.encoding.DataConversion.toStorage(DataConversion.java:214)
[Server:instance-one] 13:48:17,481 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.cache.impl.EncoderCache.valueToStorage(EncoderCache.java:111)
[Server:instance-one] 13:48:17,482 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.cache.impl.EncoderCache.putAsync(EncoderCache.java:460)
[Server:instance-one] 13:48:17,482 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.cache.impl.AbstractDelegatingAdvancedCache.putAsync(AbstractDelegatingAdvancedCache.java:386)
[Server:instance-one] 13:48:17,482 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.server.hotrod.CacheRequestProcessor.putInternal(CacheRequestProcessor.java:194)
[Server:instance-one] 13:48:17,483 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.server.hotrod.CacheRequestProcessor.lambda$put$6(CacheRequestProcessor.java:187)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.server.hotrod.CacheRequestProcessor$$Lambda$745/1582212530.run(Unknown Source)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[Server:instance-one] 13:48:17,485 ERROR [stderr] (HotRod-ServerHandler-4-10)   at java.lang.Thread.run(Thread.java:745)

Can anyone help me with this? I have searching for a solution, but i can't found anything.

UPDATE 2018-11-07

I have a new problem.

I need to use listeners with filters and converters. When i use only filters, everything it's ok, but when i add a converter i get an exception on server side. I just want to receive the objet put in cache on the CREATE custom event.

The filter class

public class RequestEventFilter implements Serializable, CacheEventFilter<String, EntityDemo> {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    private final long filter;

    public RequestEventFilter(Object[] params) {
        this.filter = (Long) params[0];
    }

    @Override
    public boolean accept(String key, EntityDemo oldValue, Metadata oldMetadata, EntityDemo newValue, Metadata newMetadata, EventType eventType) {
        if (eventType.isCreate()) {
            if (newValue.getId() % filter == 0)
                return true;
        }
        return false;
    }
}

The filter factory class

@NamedFactory(name="request-event-filter-factory")
public class RequestEventFilterFactory implements CacheEventFilterFactory {

    public RequestEventFilterFactory() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public CacheEventFilter<String, EntityDemo> getFilter(Object[] params) {
        return new RequestEventFilter(params);
    }
}

The converter class

public class RequestEventConverter implements Serializable, CacheEventConverter<String, EntityDemo, EntityDemo> {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    public RequestEventConverter() {
    }

    @Override
    public EntityDemo convert(String key, EntityDemo oldValue, Metadata oldMetadata, EntityDemo newValue, Metadata newMetadata, EventType eventType) {
        if (newValue != null)
            return newValue;
        else
            return oldValue;
    }

}

The converter factory

@NamedFactory(name="request-event-converter-factory")
public class RequestEventConverterFactory implements CacheEventConverterFactory {

    public RequestEventConverterFactory() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public CacheEventConverter<String, EntityDemo, EntityDemo> getConverter(Object[] params) {
        return new RequestEventConverter();
    }
}

My Listener

@ClientListener(filterFactoryName="request-event-filter-factory", converterFactoryName="request-event-converter-factory", includeCurrentState = true, useRawData=false)
private class Listener {

    public Listener() {
    }

    @ClientCacheEntryCreated
    public void entryCreated(ClientCacheEntryCustomEvent<EntityDemo> event) {
        System.out.println("Entry created!");
        System.out.println(event.getEventData());
    }
}

I add the listener with this code

this.cache.addClientListener(new Listener(), new Object[] {2L}), new Object[]{});

When i put a new object into infinispan cache, i get the following exception:

[Server:instance-one] 19:08:56,053 ERROR [org.infinispan.interceptors.impl.InvocationContextInterceptor] (HotRod-ServerHandler-4-103) ISPN000136: Error executing command PutKeyValueCommand, writing keys [WrappedByteArray{bytes=[B0x033E023134, hashCode=33250249}]: org.infinispan.commons.CacheListenerException: ISPN000280: Caught exception [java.lang.ClassCastException] while invoking method [public void org.infinispan.server.hotrod.ClientListenerRegistry$BaseClientEventSender.onCacheEvent(org.infinispan.notifications.cachelistener.event.CacheEntryEvent)] on listener instance: org.infinispan.server.hotrod.ClientListenerRegistry$StatefulClientEventSender@72f19221
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl.lambda$invoke$1(AbstractListenerImpl.java:387)
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl$$Lambda$560/2000982649.run(Unknown Source)
[Server:instance-one]   at org.infinispan.util.concurrent.WithinThreadExecutor.execute(WithinThreadExecutor.java:20)
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl.invoke(AbstractListenerImpl.java:404)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.doRealInvocation(CacheNotifierImpl.java:1689)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$ClusteredListenerInvocation.doRealInvocation(CacheNotifierImpl.java:1586)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.invokeNoChecks(CacheNotifierImpl.java:1680)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.invoke(CacheNotifierImpl.java:1654)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl.notifyCacheEntryCreated(CacheNotifierImpl.java:395)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.NotifyHelper.entryCommitted(NotifyHelper.java:46)
[Server:instance-one]   at org.infinispan.interceptors.locking.ClusteringDependentLogic$DistributionLogic.commitSingleEntry(ClusteringDependentLogic.java:576)
[Server:instance-one]   at org.infinispan.interceptors.locking.ClusteringDependentLogic$AbstractClusteringDependentLogic.commitEntry(ClusteringDependentLogic.java:190)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitContextEntry(EntryWrappingInterceptor.java:584)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitEntryIfNeeded(EntryWrappingInterceptor.java:813)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitContextEntries(EntryWrappingInterceptor.java:566)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.applyChanges(EntryWrappingInterceptor.java:617)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.applyAndFixVersion(EntryWrappingInterceptor.java:678)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor$$Lambda$532/1400288933.accept(Unknown Source)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextThenAccept(BaseAsyncInterceptor.java:105)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.setSkipRemoteGetsAndInvokeNextForDataCommand(EntryWrappingInterceptor.java:672)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.visitPutKeyValueCommand(EntryWrappingInterceptor.java:302)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndFinally(BaseAsyncInterceptor.java:150)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.lambda$nonTxLockAndInvokeNext$1(AbstractLockingInterceptor.java:299)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor$$Lambda$683/1667920547.apply(Unknown Source)
[Server:instance-one]   at org.infinispan.interceptors.SyncInvocationStage.addCallback(SyncInvocationStage.java:42)
[Server:instance-one]   at org.infinispan.interceptors.InvocationStage.andHandle(InvocationStage.java:65)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.nonTxLockAndInvokeNext(AbstractLockingInterceptor.java:294)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitNonTxDataWriteCommand(AbstractLockingInterceptor.java:126)
[Server:instance-one]   at org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor.visitDataWriteCommand(NonTransactionalLockingInterceptor.java:40)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitPutKeyValueCommand(AbstractLockingInterceptor.java:82)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndHandle(BaseAsyncInterceptor.java:183)
[Server:instance-one]   at org.infinispan.statetransfer.StateTransferInterceptor.handleNonTxWriteCommand(StateTransferInterceptor.java:309)
[Server:instance-one]   at org.infinispan.statetransfer.StateTransferInterceptor.handleWriteCommand(StateTransferInterceptor.java:252)
[Server:instance-one]   at org.infinispan.statetransfer.StateTransferInterceptor.visitPutKeyValueCommand(StateTransferInterceptor.java:96)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:54)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:54)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.visitPutKeyValueCommand(DDAsyncInterceptor.java:60)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndExceptionally(BaseAsyncInterceptor.java:123)
[Server:instance-one]   at org.infinispan.interceptors.impl.InvocationContextInterceptor.visitCommand(InvocationContextInterceptor.java:90)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:56)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:54)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.visitPutKeyValueCommand(DDAsyncInterceptor.java:60)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.visitCommand(DDAsyncInterceptor.java:50)
[Server:instance-one]   at org.infinispan.interceptors.impl.AsyncInterceptorChainImpl.invokeAsync(AsyncInterceptorChainImpl.java:234)
[Server:instance-one]   at org.infinispan.cache.impl.CacheImpl.executeCommandAndCommitIfNeededAsync(CacheImpl.java:1930)
[Server:instance-one]   at org.infinispan.cache.impl.CacheImpl.putAsync(CacheImpl.java:1571)
[Server:instance-one]   at org.infinispan.cache.impl.DecoratedCache.putAsync(DecoratedCache.java:690)
[Server:instance-one]   at org.infinispan.cache.impl.AbstractDelegatingAdvancedCache.putAsync(AbstractDelegatingAdvancedCache.java:386)
[Server:instance-one]   at org.infinispan.cache.impl.EncoderCache.putAsync(EncoderCache.java:460)
[Server:instance-one]   at org.infinispan.cache.impl.AbstractDelegatingAdvancedCache.putAsync(AbstractDelegatingAdvancedCache.java:386)
[Server:instance-one]   at org.infinispan.server.hotrod.CacheRequestProcessor.putInternal(CacheRequestProcessor.java:194)
[Server:instance-one]   at org.infinispan.server.hotrod.CacheRequestProcessor.lambda$put$6(CacheRequestProcessor.java:187)
[Server:instance-one]   at org.infinispan.server.hotrod.CacheRequestProcessor$$Lambda$734/240795618.run(Unknown Source)
[Server:instance-one]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[Server:instance-one]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[Server:instance-one]   at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[Server:instance-one]   at java.lang.Thread.run(Thread.java:745)
[Server:instance-one] Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
[Server:instance-one]   at org.infinispan.server.hotrod.ClientListenerRegistry$BaseClientEventSender.onCacheEvent(ClientListenerRegistry.java:360)
[Server:instance-one]   at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
[Server:instance-one]   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Server:instance-one]   at java.lang.reflect.Method.invoke(Method.java:497)
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl.lambda$invoke$1(AbstractListenerImpl.java:382)
[Server:instance-one]   ... 61 more

I have searched on infinispan documentation, but i can't find anything.

2

There are 2 best solutions below

7
On BEST ANSWER

To be able to use queries over Hot Rod, you don't need compatibility mode, I can see your cache has <compatibility enabled="true"/>. Query also does not require the deployment of entities in the server.

Regarding compatibility mode, it was deprecated in Infinispan 9.4.0, the alternative being http://infinispan.org/docs/stable/user_guide/user_guide.html#endpoint_interop.

Finally, if you really need to store unmarshalled objects in the cache,you need to deploy entities and marshallers in the server. To deploy extra protobuf marshallers, see:

http://infinispan.org/docs/stable/user_guide/user_guide.html#protostream_deployment

To deploy entities, see:

http://infinispan.org/docs/stable/user_guide/user_guide.html#entities_deploy

0
On

you do not need to use compat mode to be to do query, as already mentioned here by another answer. So I would try first without compat mode. Whithout compat mode you also no longer need to deploy entities and marshallers in the server so everything is simplified and there is less potential for mistakes. I already see you have a small issue in EntityDemoMarshaller.readFrom. It just returns null instead of the returning the unmarshalled entity, so that has the potential to ruin the whole thing anyway. Please fix that and update this question with new stacktrace if it still fails or close it if it is no longer an issue.

If your use case absolutely needs compat mode for whatever reason, then please read about transcoding in user guide and use that instead, because compat mode is deprecated and being replaced by transcoding.