Flink -- RocksDb returns Null, even updated previously

351 Views Asked by At

I am using RocksDb for state operation in my flink application.

Please take a look this code:

public class Process extends KeyedProcessFunction<Tuple, Record, Result>{
    private transient ValueState<Integer> state;

    @Override
    public void open(Configuration parameters)
    {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.seconds(30))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                .cleanupInBackground()
                .build();

        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
                "state",
                TypeInformation.of(Integer.class));

        stateDescriptor.enableTimeToLive(ttlConfig);
        state = getRuntimeContext().getState(stateDescriptor);
    }


    @Override
    public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
    {
        updateStates(value);

        Integer stateValue = state.value();

        logger.info("processElement -- subTaskName: {}, stateValue: {} , ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);

        if (stateValue % 2 == 0)
        {
            out.collect(..)
        }
    }


    private void updateStates(Record value) throws Exception
    {

        if (state.value() == null)
        {
            state.update(1);
        }

        Integer stateValue = state.value();
        
        logger.info("updateStates -- subTaskName: {}, stateValue: {} ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);
        
        if (something in record true)
        {
            stateValue ++;
            state.update(stateValue);
        }
    }
}

In this process function, I am getting NullPointerException, because stateValue in the process function is null.

And also some logs inside the updateState() :

2020-12-04 ... INFO  ...  - updateStates -- subTaskName: Process (9/40), stateValue: null

Why I am getting Null values from state, even I updated the state previously.

Are rocksdb update()and value() operations done by async way?

According to the @DavidAnderson request, I am adding the code that uses the KeyedProcessFunction:

public class Record{
    private String customerName; 
    
    // getters and setter for customerName and other fields
}

private DataStream<Result> applyProcess(DataStream<Record> otherStream)
    {
        return otherStream
                .filter(record -> filter if record.method() returns true,-there is no exception in here- )
                .name("filtered")
                .keyBy('customerName')
                .process(new Process())
                .name("ProcessFunction");
 }

UPDATE: That's seems silly but, application works when I have updated the code like this one: (I just removed the updateStates method and put into the processElement function)

@Override
    public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
    {
       /* 
        This case is not working...
        if (state.value() == null) {
            stateValue = 1;
        }
        */

         Integer stateValue = state.value();

        if (stateValue == null){
            stateValue = 1;
        }
        

        if (something in record true)
        {
            stateValue ++;
            state.update(stateValue);
        }

        if (stateValue % 2 == 0)
        {
            out.collect(..)
        }
    }
0

There are 0 best solutions below