I am using RocksDb for state operation in my flink application.
Please take a look this code:
public class Process extends KeyedProcessFunction<Tuple, Record, Result>{
private transient ValueState<Integer> state;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.cleanupInBackground()
.build();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
"state",
TypeInformation.of(Integer.class));
stateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
{
updateStates(value);
Integer stateValue = state.value();
logger.info("processElement -- subTaskName: {}, stateValue: {} , ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);
if (stateValue % 2 == 0)
{
out.collect(..)
}
}
private void updateStates(Record value) throws Exception
{
if (state.value() == null)
{
state.update(1);
}
Integer stateValue = state.value();
logger.info("updateStates -- subTaskName: {}, stateValue: {} ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);
if (something in record true)
{
stateValue ++;
state.update(stateValue);
}
}
}
In this process function, I am getting NullPointerException
, because stateValue
in the process function is null.
And also some logs inside the updateState()
:
2020-12-04 ... INFO ... - updateStates -- subTaskName: Process (9/40), stateValue: null
Why I am getting Null values from state, even I updated the state previously.
Are rocksdb update()
and value()
operations done by async way?
According to the @DavidAnderson request, I am adding the code that uses the KeyedProcessFunction:
public class Record{
private String customerName;
// getters and setter for customerName and other fields
}
private DataStream<Result> applyProcess(DataStream<Record> otherStream)
{
return otherStream
.filter(record -> filter if record.method() returns true,-there is no exception in here- )
.name("filtered")
.keyBy('customerName')
.process(new Process())
.name("ProcessFunction");
}
UPDATE: That's seems silly but, application works when I have updated the code like this one: (I just removed the updateStates method and put into the processElement function)
@Override
public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
{
/*
This case is not working...
if (state.value() == null) {
stateValue = 1;
}
*/
Integer stateValue = state.value();
if (stateValue == null){
stateValue = 1;
}
if (something in record true)
{
stateValue ++;
state.update(stateValue);
}
if (stateValue % 2 == 0)
{
out.collect(..)
}
}