Flink savepoint not saving the valuestates

229 Views Asked by At

I'm writing a Flink program and can't save my stateful variables while restarting a new job.

I made a simple program with a connector from Kafka where I receive messages and a RichFlatMap with a valueState variable. This variable is an integer that increases by 1 with every message.

I make a stop-savepoint when the value is around 15, but when I restore it from that savepoint the counter comes back to 1.

Streamingjob.java:

KeyedStream<JsonNode, Object> eventsByKey = env
                .addSource(consumer).name("Producer Topic Source")
                .keyBy(e -> {...});

eventsByKey 
        .flatMap(new Test())
        .uid("test-id")

Test.java:

public class Test extends RichFlatMapFunction<JsonNode, JsonNode> {

    private transient ValueState<Integer> persistence;

    @Override
    public void flatMap(JsonNode node, Collector<JsonNode> collector) throws Exception {
        if (persistence.value() == null) persistence.update(1);
        String device_id = node.get("data").get("device_id").toString();
        System.out.println();
        System.out.println(device_id);
        System.out.println(persistence.value());
        System.out.println();
        persistencia.update(persistence.value() + 1);
    }

    @Override
    public void open(Configuration config) {
        this.persistence = getRuntimeContext().getState(new ValueStateDescriptor<>(
                "prueba", // the state name
                Integer.class));
    }
}

This is the command I use for stop-savepoint ../bin/flink stop --savepointPath f74c92af01ed51af94e530ee0e208d7c

And this one for start-savepoint ../bin/flink run flink-andy-12.3.0.jar --savepointPath file:/{...}/savepoint-f74c92-6acdb05afd11

Any ideas on what should I do?

1

There are 1 best solutions below

0
On

To restart from a savepoint you need to specify --fromSavepoint, and not --savepointPath. (docs)

In other words:

$ ./bin/flink run \
      --fromSavepoint /{...}/savepoint-f74c92-6acdb05afd11 \
      flink-andy-12.3.0.jar