Google Cloud dataflow dropping messages

396 Views Asked by At

I have the following application that runs a data pipeline and listens to Google pubsub:

Config:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
    <version>2.0.0</version>
</dependency>

Code:

Pipeline p = //Initialise pipeline
p.apply("read messages", readMessage())
    .apply("log message", logMessage())

private static PubsubIO.Read<String> readMessage() {
    return PubsubIO.readStrings()
        .fromSubscription("my_subscription");
}

//Log message just logs the message

I faced an issue where messages were getting dropped intermittently (i.e. they were not making it to dataflow pipeline from pubsub). In order to debug, I wrote another application and made it listen to the same pubsub. Below is the config:

Config:

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
    <version>0.22.0-beta</version>
</dependency>

<dependency>
    <groupId>com.google.apis</groupId>
    <artifactId>google-api-services-pubsub</artifactId>
    <version>v1-rev358-1.22.0</version>
</dependency>

Code:

@Component
public class PubsubMessageReceiver implements MessageReceiver{

    private static Logger logger = LoggerFactory.getLogger(PubsubMessageReceiver.class);

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        logger.info(message.getData().toStringUtf8());
        consumer.ack();
    }
}

I made it run for a couple of hours (by creating a new subscription to listen to the same topic as the original application), checked the log files and found some messages that did not make it to the dataflow pipeline.

Now, this is intermittent and I am not able to reproduce it (even with load testing). Does this have anything to do with google's beam libraries (looks like a race condition or similar)? If yes, does anyone know if this can be fixed by changing the configuration/updating the version?

0

There are 0 best solutions below