Why does Flink Exactly once commit not fail?

168 Views Asked by At

I am using Flink EO. When restarting the Flink Job i am getting this warning:

2023-12-30 13:07:44.538 [Co-Flat Map -> Sink: Sink1 (3/8)#0] WARN o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunction - Transaction KafkaTransactionState [transactionalId=ABCD, producerId=1028910, epoch=3] has been open for 2813524 ms. This is close to or even exceeding the transaction timeout of 900000 ms.

But then the commit succeeds. I am not understanding why does the commit succeed if the transaction has been open for more than the timeout configured. (transaction.max.timeout.ms)

2023-12-30 13:07:44.907 [Co-Flat Map -> Sink: Sink1 (3/8)#0] INFO o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 3/8 committed recovered transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=ABCD, producerId=1028910, epoch=3], transactionStartTime=1703938851014}

Even checked the TwoPhaseCommitFunction code, should't this warning mean that the commit should definitely fail?

private void recoverAndCommitInternal(TransactionHolder<TXN> transactionHolder) {
        try {
            logWarningIfTimeoutAlmostReached(transactionHolder);
            recoverAndCommit(transactionHolder.handle);
        } catch (final Exception e) {
            final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime;
            if (ignoreFailuresAfterTransactionTimeout && elapsedTime > transactionTimeout) {
                LOG.error(
                        "Error while committing transaction {}. "
                                + "Transaction has been open for longer than the transaction timeout ({})."
                                + "Commit will not be attempted again. Data loss might have occurred.",
                        transactionHolder.handle,
                        transactionTimeout,
                        e);
            } else {
                throw e;
            }
        }
    }

    private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> transactionHolder) {
        final long elapsedTime = transactionHolder.elapsedTime(clock);
        if (transactionTimeoutWarningRatio >= 0
                && elapsedTime > transactionTimeout * transactionTimeoutWarningRatio) {
            LOG.warn(
                    "Transaction {} has been open for {} ms. "
                            + "This is close to or even exceeding the transaction timeout of {} ms.",
                    transactionHolder.handle,
                    elapsedTime,
                    transactionTimeout);
        }
    }

Is there something i am missing.? The issue is even if start the job after 1 hour, the older transactions are getting committed?

Is it happening because FLink is resuming the transaction? Does kafka allow aborted messages to be recovered, if given the same producer id and epoch?

 producer = initTransactionalProducer(transaction.transactionalId, false);
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                producer.commitTransaction();

What parameters actually decide that a commit will fail, if not for timeout?

FYI : transaction.abort.timed.out.transaction.cleanup.interval.ms uses the default value.

3

There are 3 best solutions below

0
Avishek Bhattacharya On

The warning message you’re seeing is logged by the logWarningIfTimeoutAlmostReached method, which checks if the transaction has been open for a duration close to or exceeding the transaction timeout. However, this does not necessarily mean that the transaction will fail to commit.

In Two Phase commit protocol, it is possible that the protocol can get stalled for significant amount of time. It is known as blocking problem in 2PC.

The warning is there to alert you that the transaction has been open for a long time, which could potentially lead to issues, but it doesn’t automatically cause the transaction to fail.

Regarding your question about Kafka allowing aborted messages to be recovered if given the same producer id and epoch, the answer is yes. Kafka’s transactional producer uses the producer id and epoch to identify and manage ongoing transactions. If a producer with the same id and epoch resumes a transaction, it can continue where it left off.

0
kkrugler On

Flink doesn't know that (on the Kafka side) a transaction has timed out, that's why I believe you don't get a failure from Flink. If the transaction times out, you can drop (lose) records. See https://ververica.zendesk.com/hc/en-us/articles/360013269680 for more details.

0
David Anderson On

There are issues with the current design that make it impossible for Flink to cope with Kafka transaction timeouts. See KIP-939: Support Participation in 2PC and FLIP-319: Integrate with Kafka's Support for Proper 2PC Participation (KIP-939) for all the details, including a proposal for how this can be fixed.