Possible bug if doing parallelstream (distributed) computation on *EMPTY* Infinispan cache?

141 Views Asked by At

I'm a newbie to Infinispan and have been playing with it. I think I found a bug. If a cache is empty and we have two nodes running (and the nodes must actually be on separate virtual machines -- it can't just be 2 jvm processes), and then a 2nd process on node#1 does a parallel streaming operation as follows:

final List<String> max = c
  .entrySet()
  .parallelStream()
  .map(e ->  e.getKey().substring(0,1))
  .collect(() -> Collectors.toList());

You'll get the following error (the main error is I think " Invalid lambda deserialization"). NOTE: This error does NOT occur if the cache is populated with some data. I tried (reasonably hard) to trace the code, but couldn't figure out the issue, though I suspect it has something to do with deserializing an empty collector of some sort....Has anyone seen this or think this is a likely bug (as opposed to user error? I've tried many many things before posting....). Any workaround? (Or maybe it's a super easy "real" fix in the actual relevant code?)

Exception in thread "main" org.infinispan.remoting.RemoteException: ISPN000217: Received exception from proteowizard-dev2-16cpu-43102(rack-id=qa-rack, machine-id=qa-machine1), see cause for remote stack trace
at org.infinispan.remoting.transport.ResponseCollectors.wrapRemoteException(ResponseCollectors.java:27)

at org.infinispan.remoting.transport.ValidSingleResponseCollector.withException(ValidSingleResponseCollector.java:37)

at org.infinispan.remoting.transport.ValidSingleResponseCollector.addResponse(ValidSingleResponseCollector.java:21)

at org.infinispan.remoting.transport.impl.SingleTargetRequest.receiveResponse(SingleTargetRequest.java:52)

at org.infinispan.remoting.transport.impl.SingleTargetRequest.onResponse(SingleTargetRequest.java:35)

at org.infinispan.remoting.transport.impl.RequestRepository.addResponse(RequestRepository.java:52)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport.processResponse(JGroupsTransport.java:1372)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport.processMessage(JGroupsTransport.java:1275)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport.access$300(JGroupsTransport.java:126)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport$ChannelCallbacks.up(JGroupsTransport.java:1420)

at org.jgroups.JChannel.up(JChannel.java:816)

at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:893)

at org.jgroups.protocols.FRAG3.up(FRAG3.java:171)

at org.jgroups.protocols.FlowControl.up(FlowControl.java:343)

at org.jgroups.protocols.pbcast.GMS.up(GMS.java:873)

at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:240)

at org.jgroups.protocols.UNICAST3.deliverMessage(UNICAST3.java:1003)

at org.jgroups.protocols.UNICAST3.handleDataReceived(UNICAST3.java:729)

at org.jgroups.protocols.UNICAST3.up(UNICAST3.java:384)

at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:600)

at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:130)

at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:203)

at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:253)

at org.jgroups.protocols.MERGE3.up(MERGE3.java:280)

at org.jgroups.protocols.Discovery.up(Discovery.java:269)

at org.jgroups.protocols.TP.passMessageUp(TP.java:1248)

at org.jgroups.util.SubmitToThreadPool$SingleMessageHandler.run(SubmitToThreadPool.java:87)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalStateException: Unexpected exception

at org.jboss.marshalling.reflect.JDKSpecific$SerMethods.callReadResolve(JDKSpecific.java:260)

at org.jboss.marshalling.reflect.SerializableClass.callReadResolve(SerializableClass.java:271)

at org.jboss.marshalling.river.RiverUnmarshaller.doReadNewObject(RiverUnmarshaller.java:1396)

at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:272)

at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:205)

at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41)

at org.infinispan.marshall.core.ExternalJBossMarshaller.objectFromObjectStream(ExternalJBossMarshaller.java:47)

at org.infinispan.marshall.core.GlobalMarshaller.readUnknown(GlobalMarshaller.java:873)

at org.infinispan.marshall.core.GlobalMarshaller.readNonNullableObject(GlobalMarshaller.java:697)

at org.infinispan.marshall.core.GlobalMarshaller.readNullableObject(GlobalMarshaller.java:361)

at org.infinispan.marshall.core.BytesObjectInput.readObject(BytesObjectInput.java:40)

at org.infinispan.stream.impl.intops.IntermediateOperationExternalizer.readObject(IntermediateOperationExternalizer.java:377)

at org.infinispan.stream.impl.intops.IntermediateOperationExternalizer.readObject(IntermediateOperationExternalizer.java:92)

at org.infinispan.marshall.core.GlobalMarshaller.readWithExternalizer(GlobalMarshaller.java:708)

at org.infinispan.marshall.core.GlobalMarshaller.readNonNullableObject(GlobalMarshaller.java:691)

at org.infinispan.marshall.core.GlobalMarshaller.readNullableObject(GlobalMarshaller.java:361)

at org.infinispan.marshall.core.BytesObjectInput.readObject(BytesObjectInput.java:40)

at org.infinispan.commons.marshall.MarshallUtil.lambda$unmarshallCollection$0(MarshallUtil.java:284)

at org.infinispan.commons.marshall.MarshallUtil.unmarshallCollection(MarshallUtil.java:267)

at org.infinispan.commons.marshall.MarshallUtil.unmarshallCollection(MarshallUtil.java:284)

at org.infinispan.marshall.exts.CollectionExternalizer.readObject(CollectionExternalizer.java:120)

at org.infinispan.marshall.exts.CollectionExternalizer.readObject(CollectionExternalizer.java:27)

at org.infinispan.marshall.core.GlobalMarshaller.readWithExternalizer(GlobalMarshaller.java:708)

at org.infinispan.marshall.core.GlobalMarshaller.readNonNullableObject(GlobalMarshaller.java:691)

at org.infinispan.marshall.core.GlobalMarshaller.readNullableObject(GlobalMarshaller.java:361)

at org.infinispan.marshall.core.BytesObjectInput.readObject(BytesObjectInput.java:40)

at org.infinispan.stream.impl.termop.TerminalOperationExternalizer.readObject(TerminalOperationExternalizer.java:192)

at org.infinispan.stream.impl.termop.TerminalOperationExternalizer.readObject(TerminalOperationExternalizer.java:42)

at org.infinispan.marshall.core.GlobalMarshaller.readWithExternalizer(GlobalMarshaller.java:708)

at org.infinispan.marshall.core.GlobalMarshaller.readNonNullableObject(GlobalMarshaller.java:691)

at org.infinispan.marshall.core.GlobalMarshaller.readNullableObject(GlobalMarshaller.java:361)

at org.infinispan.marshall.core.BytesObjectInput.readObject(BytesObjectInput.java:40)

at org.infinispan.stream.impl.StreamRequestCommand.readFrom(StreamRequestCommand.java:143)

at org.infinispan.marshall.exts.ReplicableCommandExternalizer.readCommandParameters(ReplicableCommandExternalizer.java:104)

at org.infinispan.marshall.exts.CacheRpcCommandExternalizer.readObject(CacheRpcCommandExternalizer.java:132)

at org.infinispan.marshall.exts.CacheRpcCommandExternalizer.readObject(CacheRpcCommandExternalizer.java:66)

at org.infinispan.marshall.core.GlobalMarshaller.readWithExternalizer(GlobalMarshaller.java:708)

at org.infinispan.marshall.core.GlobalMarshaller.readNonNullableObject(GlobalMarshaller.java:691)

at org.infinispan.marshall.core.GlobalMarshaller.readNullableObject(GlobalMarshaller.java:361)

at org.infinispan.marshall.core.GlobalMarshaller.objectFromObjectInput(GlobalMarshaller.java:194)

at org.infinispan.marshall.core.GlobalMarshaller.objectFromByteBuffer(GlobalMarshaller.java:223)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport.processRequest(JGroupsTransport.java:1332)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport.processMessage(JGroupsTransport.java:1272)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport.access$300(JGroupsTransport.java:126)

at org.infinispan.remoting.transport.jgroups.JGroupsTransport$ChannelCallbacks.up(JGroupsTransport.java:1420)

at org.jgroups.JChannel.up(JChannel.java:816)

at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:893)

at org.jgroups.protocols.FRAG3.up(FRAG3.java:171)

at org.jgroups.protocols.FlowControl.up(FlowControl.java:351)

... 16 more

Caused by: java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.jboss.marshalling.reflect.JDKSpecific$SerMethods.callReadResolve(JDKSpecific.java:250)

... 64 more

Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization

at com.deepdia.deepsearch.modules.infinispan.StartClient.$deserializeLambda$(StartClient.java:32)

... 74 more

Caused by: an exception which occurred:

in object of type java.lang.invoke.SerializedLambda

-> classloader hierarchy:

(Other details: Infinispan 9.4.1 embedded; OS --> Win Server 2016; oracle jdk8; the windows machines are running as VMs on Google Compute, but NOT kubernetes; jgroups was configured to use TCPPing with hardcoded hostnames)

Also, although I don't think it's pertinent, here's the cache definition:

 Configuration conf = new ConfigurationBuilder()
                .memory()
                .evictionStrategy(EvictionStrategy.REMOVE) //I think this is default
                .size(SIZE)
                .unsafe()
                .unreliableReturnValues(false)
                .clustering()
                .cacheMode(CacheMode.DIST_ASYNC)
                .hash()
                .numOwners(1)
                .numSegments(100)
                .capacityFactor(capacityFactor)
                .persistence()
                .passivation(true)
                .addStore(RocksDBStoreConfigurationBuilder.class)
                .location("C:\\Users\\gsaxena888\\Downloads\\temp" + vmNum + "\\data")
                .expiredLocation("C:\\Users\\gsaxena888\\Downloads\\temp\\" + vmNum + "\\expired")
                .segmented(true)
                .shared(false)
                .async()
                .enabled(true)
                .threadPoolSize(1)
                .modificationQueueSize(SIZE)
                .build();

        String newCacheName = "distributedWithL1";
        manager.defineConfiguration(newCacheName, conf);
        Cache</*StringHolderWorking*/String, /*StringHolderWorking*/String> c = manager.getCache(newCacheName);

And, although I don't think this either is pertinent, here's the ggroups xml file:

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.0.xsd">
   <TCP bind_addr="match-address:10.*"
        bind_port="${jgroups.tcp.port:7800}"
        enable_diagnostics="false"
        thread_naming_pattern="pl"
        send_buf_size="640k"
        sock_conn_timeout="300"
        bundler_type="no-bundler"

        thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
        thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
        thread_pool.keep_alive_time="60000"
   />
   <TCPPING initial_hosts="10.240.0.27[7800],10.240.0.9[7800]" />
   <MERGE3 min_interval="10000"
           max_interval="30000"
   />
   <FD_SOCK />
   <!-- Suspect node `timeout` to `timeout + timeout_check_interval` millis after the last heartbeat -->
   <FD_ALL timeout="10000"
           interval="2000"
           timeout_check_interval="1000"
   />
   <VERIFY_SUSPECT timeout="1000"/>
   <pbcast.NAKACK2 use_mcast_xmit="false"
                   xmit_interval="100"
                   xmit_table_num_rows="50"
                   xmit_table_msgs_per_row="1024"
                   xmit_table_max_compaction_time="30000"
                   resend_last_seqno="true"
   />
   <UNICAST3 xmit_interval="100"
             xmit_table_num_rows="50"
             xmit_table_msgs_per_row="1024"
             xmit_table_max_compaction_time="30000"
   />
   <pbcast.STABLE stability_delay="500"
                  desired_avg_gossip="5000"
                  max_bytes="1M"
   />
   <pbcast.GMS print_local_addr="false"
               join_timeout="${jgroups.join_timeout:5000}"
   />
   <MFC max_credits="2m"
        min_threshold="0.40"
   />
   <FRAG3/>
</config>
0

There are 0 best solutions below