I'm writing a Flink program and can't save my stateful variables while restarting a new job.
I made a simple program with a connector from Kafka where I receive messages and a RichFlatMap with a valueState variable. This variable is an integer that increases by 1 with every message.
I make a stop-savepoint when the value is around 15, but when I restore it from that savepoint the counter comes back to 1.
Streamingjob.java:
KeyedStream<JsonNode, Object> eventsByKey = env
                .addSource(consumer).name("Producer Topic Source")
                .keyBy(e -> {...});
eventsByKey 
        .flatMap(new Test())
        .uid("test-id")
Test.java:
public class Test extends RichFlatMapFunction<JsonNode, JsonNode> {
    private transient ValueState<Integer> persistence;
    @Override
    public void flatMap(JsonNode node, Collector<JsonNode> collector) throws Exception {
        if (persistence.value() == null) persistence.update(1);
        String device_id = node.get("data").get("device_id").toString();
        System.out.println();
        System.out.println(device_id);
        System.out.println(persistence.value());
        System.out.println();
        persistencia.update(persistence.value() + 1);
    }
    @Override
    public void open(Configuration config) {
        this.persistence = getRuntimeContext().getState(new ValueStateDescriptor<>(
                "prueba", // the state name
                Integer.class));
    }
}
This is the command I use for stop-savepoint
../bin/flink stop --savepointPath f74c92af01ed51af94e530ee0e208d7c
And this one for start-savepoint
../bin/flink run flink-andy-12.3.0.jar --savepointPath file:/{...}/savepoint-f74c92-6acdb05afd11
Any ideas on what should I do?
                        
To restart from a savepoint you need to specify
--fromSavepoint, and not--savepointPath. (docs)In other words: