Java DDS Unbounded Sequence Causes Out Of Memory Error

708 Views Asked by At

I am new to DDS, and trying to write a simple Java program in Intellij-IDEA that consists of 3 parts:

  1. Client Simulator that sends data.
  2. My program simulator that receive data from the client, manipulate it and sends it back to the client.
  3. Client Simulator that reads the manipulated data.

All the data that I am trying to send in my example is a simple String.

I am using RTI Code Gen to auto-generate most of the code.

Without and unboundedSupport flag (the string is limited to 255 characters) everything worked just fine. However, when applying the unboundedSupport flag, I am getting the following out-of-memory error:

java.lang.OutOfMemoryError: Java heap space
    at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
    at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
    at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
    at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
    at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
    at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error

I am activating the client simulator that reads data first.

This is my .idl file:

struct JsonMessage {
    string msg;
};

This is my main program (line 10 is the initialization of subscriber1):

public static void main(String... args) {
    ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
    JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber1.consume();
    ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
    JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber2.consume();
    ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
    JsonMessageSubscriber subscriber3 =
        new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
    subscriber3.consume();
  }

This is my ClientResultsConsumer class:

public class ClientResultsConsumer implements Consumer {

  @Override
  public void consume(String msg) {
    System.out.println("Client results consumer got " + msg);
  }
}

This is my JsonMessageSubscriber class (line 71 is subscriber.create_datareader():

public class JsonMessageSubscriber implements DataConsumer {

  ExecutorService executor = Executors.newSingleThreadExecutor();

  public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {

    DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
        .create_participant(domainId,
                            DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
                            null /* listener */,
                            StatusKind.STATUS_MASK_NONE);
    if (participant == null) {
      System.err.println("create_participant error\n");
      System.exit(-1);
    }

    // --- Create subscriber --- //

            /* To customize subscriber QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Subscriber subscriber = participant.create_subscriber(
        DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
        StatusKind.STATUS_MASK_NONE);
    if (subscriber == null) {
      System.err.println("create_subscriber error\n");
      System.exit(-1);
    }

    // --- Create topic --- //

    /* Register type before creating topic */
    String typeName = JsonMessageTypeSupport.get_type_name();
    JsonMessageTypeSupport.register_type(participant, typeName);

            /* To customize topic QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Topic topic = participant.create_topic(
        topicName,
        typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
        null /* listener */, StatusKind.STATUS_MASK_NONE);
    if (topic == null) {
      System.err.println("create_topic error\n");
      System.exit(-1);
    }

    // --- Create reader --- //

    DataReaderListener listener = new JsonMessageListener(consumer);

            /* To customize data reader QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    JsonMessageDataReader reader = (JsonMessageDataReader)
        subscriber.create_datareader(
            topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
            StatusKind.STATUS_MASK_ALL);
    if (reader == null) {
      System.err.println("create_datareader error\n");
      System.exit(-1);
    }
  }

  // -----------------------------------------------------------------------

  @Override
  public void consume() {
    final long scanTimeMillis = 1000;
    Runnable task = () -> {
      while (true) {
        try {
          TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
        } catch (Exception e) {
          System.err.println(e.getMessage());
        }
      }
    };
    executor.submit(task);
  }
}

Unfortunately, I didn't find a solution to that except limiting the sequence size, but I understood that limiting it to a large enough number will solve my problem, it will also require a lot of memory, and I would rather it not taking more than the minimum required for each message.

Any help will be appreciated, Thanks

2

There are 2 best solutions below

0
On BEST ANSWER

I managed to solve the problem using the example here

All it took was passing the auto generated qos file path to the subscriber/publisher constructor, and than writing these lines before initializing the domain participant (this is different than the example provided in the link above, the provided example did not work for me):

DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
factoryQos.profile.url_profile.add(0, qosPolicyPath);
factoryQos.profile.url_profile.setMaximum(1);
DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);
1
On

When using -unboundedSupport there are some memory thresholds that have to be set in your QoS file. These thresholds are described here in the user manual and they define the threshold in which memory for samples is either dynamically allocated or reused from a pre-allocated source. These must be set both in the DataReader and in the DataWriter.

The settings for these thresholds really depend on your data size and from your description I do not have enough information to provide you with an example that makes sense in your scenario. Basically, you do no want to be dynamically allocating memory for every sample. This might have an impact on performance, depending on your data rates. You want to select values where most samples are using the pre-allocated memory. The example provided in the user manual under the section "Writer-Side Memory Management when Working with Large Data" is of video streaming which contains larger less frequent I-frames and smaller more frequent P-frames. You can look at that section and the corresponding DataReader section for an example XML file.