How to achieve exactly-once in Cassandra when stream processing?

196 Views Asked by At

I have a Cassandra table which looks like this

CREATE TABLE tmp.inventory (
    t_id text,
    is_available boolean,
    modified_at bigint,
    price double,
    available_units bigint,
    PRIMARY KEY(t_id, modified_at)

I have a streaming pipeline which updates the items in Cassandra. The streaming pipeline is checkpointed at an interval. So when the pipeline fails, it will re-process the source data since last successful checkpoint. And when it re-processes after a failure, it will try to overwrite data in Cassandra which were successfully written already (i.e. after last successful checkpoint but before failure). I was thinking of leveraging the modified_at column to achieve this. Something like

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? AND modified_at < ?

This is me trying to do the update only if the modified_at in Cassandra is lesser than the modified_at in the pipeline. However, this throws InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements

I though IF condition can help in this case.

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? IF modified_at < ?

but this throws InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions

So what would be the ideal way to handle this?

Edit If I'm only having these fields in this table, then re-processing the events may not be that big of a problem, as it will eventually become consistent as and when the pipeline catches up to the live stream but say there is another streaming job which updates the same table with current price, available units, etc. In that case, if one of the job fails and re-starts, the table could be in an in-consistent state.


There are 1 best solutions below


To avoid the situation when one thread can write older data after another thread already inserted the newer data you can use the USING TIMESTAMP when doing INSERT or UPDATE (in Cassandra anything is UPSERT anyway, so using INSERT could be easier from syntax perspective, imho). The idea is that you explicitly specify the timestamp of the record, so when another thread inserts older data later than previous thread the data will be inserted, but they won't win because Cassandra uses timestamp (explicitly specified) to detect the latest version. Something like this:

INSERT INTO tmp.inventory (t_id, is_available, modified_at)
  VALUES (?, ?,?)
  USING TIMESTAMP <modified_at*1000>

The only thing to remember is that value specified in the USING TIMESTAMP uses the microseconds instead of milliseconds, and you need to calculate the value of <modified_at*1000> - you can't use expression there (here it's just for example).