I'm writing a Flink program and can't save my stateful variables while restarting a new job.
I made a simple program with a connector from Kafka where I receive messages and a RichFlatMap with a valueState variable. This variable is an integer that increases by 1 with every message.
I make a stop-savepoint when the value is around 15, but when I restore it from that savepoint the counter comes back to 1.
Streamingjob.java:
KeyedStream<JsonNode, Object> eventsByKey = env
.addSource(consumer).name("Producer Topic Source")
.keyBy(e -> {...});
eventsByKey
.flatMap(new Test())
.uid("test-id")
Test.java:
public class Test extends RichFlatMapFunction<JsonNode, JsonNode> {
private transient ValueState<Integer> persistence;
@Override
public void flatMap(JsonNode node, Collector<JsonNode> collector) throws Exception {
if (persistence.value() == null) persistence.update(1);
String device_id = node.get("data").get("device_id").toString();
System.out.println();
System.out.println(device_id);
System.out.println(persistence.value());
System.out.println();
persistencia.update(persistence.value() + 1);
}
@Override
public void open(Configuration config) {
this.persistence = getRuntimeContext().getState(new ValueStateDescriptor<>(
"prueba", // the state name
Integer.class));
}
}
This is the command I use for stop-savepoint
../bin/flink stop --savepointPath f74c92af01ed51af94e530ee0e208d7c
And this one for start-savepoint
../bin/flink run flink-andy-12.3.0.jar --savepointPath file:/{...}/savepoint-f74c92-6acdb05afd11
Any ideas on what should I do?
To restart from a savepoint you need to specify
--fromSavepoint
, and not--savepointPath
. (docs)In other words: