Is there a retention policy for custom state store (RocksDb) with Kafka streams?

838 Views Asked by At

I am setting up a new Kafka streams application, and want to use custom state store using RocksDb. This is working fine for putting data in state store and getting a queryable state store from it and iterating over the data, However, after about 72 hours I observe data to be missing from the store. Is there a default retention time on data for state store in Kafka streams or in RocksDb?

I are using custom state store using RocksDb so that we can utilize the column family feature, that we can't use with the embedded RocksDb implementation with KStreams. I have implementated custom store using KeyValueStore interface. And have my own StoreSupplier, StoreBuilder, StoreType and StoreWrapper as well. A changelog topic is created for the application but no data is going to it yet (haven't looked into that problem yet).

Putting data into this custom state store and getting queryable state store from it is working fine. However, I am seeing that data is missing after about 72 hours from the store. I checked by getting the size of the state store directory as well as by exporting the data into files and checking the number of entries.

Using SNAPPY compression and UNIVERSAL compaction

Simple topology:

            final StreamsBuilder builder = new StreamsBuilder();
            String storeName = "store-name"
            List<String> cfNames = new ArrayList<>();


            // Hybrid custom store
            final StoreBuilder customStore = new RocksDBColumnFamilyStoreBuilder(storeName, cfNames);
            builder.addStateStore(customStore);

            KStream<String, String> inputstream = builder.stream(
                    inputTopicName,
                    Consumed.with(Serdes.String(), Serdes.String()
                    ));

            inputstream
                    .transform(() -> new CurrentTransformer(storeName), storeName);

            Topology tp = builder.build();

Snippet from custom store implementation:

 RocksDBColumnFamilyStore(final String name, final String parentDir, List<String> columnFamilyNames) {
     .....  
     ......

        final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig()
                .setBlockCache(cache)
                .setBlockSize(BLOCK_SIZE)
                .setCacheIndexAndFilterBlocks(true)
                .setPinL0FilterAndIndexBlocksInCache(true)
                .setFilterPolicy(filter)
                .setCacheIndexAndFilterBlocksWithHighPriority(true)
                .setPinTopLevelIndexAndFilter(true)
                ;


        cfOptions = new ColumnFamilyOptions()
                .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
                .setCompactionStyle(CompactionStyle.UNIVERSAL)
                .setMaxWriteBufferNumber(MAX_WRITE_BUFFERS)
                .setOptimizeFiltersForHits(true)
                .setLevelCompactionDynamicLevelBytes(true)
                .setTableFormatConfig(tableConfig);


        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions));

        columnFamilyNames.stream().forEach((cfName) -> columnFamilyDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(), cfOptions)));
    }

    @SuppressWarnings("unchecked")
    public void openDB(final ProcessorContext context) {
        Options opts = new Options()
                .prepareForBulkLoad();

        options = new DBOptions(opts)
                .setCreateIfMissing(true)
                .setErrorIfExists(false)
                .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
                .setMaxOpenFiles(-1)
                .setWriteBufferManager(writeBufferManager)
                .setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2))
                .setCreateMissingColumnFamilies(true);

        fOptions = new FlushOptions();
        fOptions.setWaitForFlush(true);

        dbDir = new File(new File(context.stateDir(), parentDir), name);

            try {
               Files.createDirectories(dbDir.getParentFile().toPath());
                db = RocksDB.open(options, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);

                columnFamilyHandles.stream().forEach((handle) -> {
                    try {
                        columnFamilyMap.put(new String(handle.getName()), handle);
                    } catch (RocksDBException e) {
                        throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
                    }
                });
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
            }
        open = true;
    }

The expectation is that the state store (RocksDb) will retain the data indefinitely until manually deleted or until the storage disk goes down. I am not aware that Kafka streams has introduced having TTl with state stores yet.

0

There are 0 best solutions below