Impact on Flink-SQL application using LZ4 Compression with Rocksdb state backend

81 Views Asked by At

I am using flink 1.17.1 and running flink-sql application something like

create table enrich_table (
col1 STRING,
col2....
)
WITH ( 'connector' = 'upsert-kafka',
    'topic' = 'enrich_table',
    'properties.bootstrap.servers' = '${KAFKA_BROKERS}',
    'properties.group.id' = '${CONSUMER_GROUP_ID}',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}',
    'key.format' = 'avro-confluent',
    'key.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}');


Insert into enrich_table as 
select
    col1,col2...

from 
    table1 t1 
    inner join table2 t2 on ..
    inner join table3 t2 on ...
    .....

Setting LZ4 compression to rocksdb by creating custom factory

package my.org.abc

import org.rocksdb.{ColumnFamilyOptions, DBOptions}
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, DefaultConfigurableOptionsFactory, RocksDBOptionsFactory}


import org.apache.flink.configuration.ConfigOptions
import org.apache.flink.configuration.ConfigOption

import org.apache.flink.configuration.description.LinkElement.link
import org.rocksdb.CompressionType
import org.apache.flink.configuration.description.Description


class RocksDBTuningJobOptionsFactory extends ConfigurableRocksDBOptionsFactory {
  private val defaultFactory = new DefaultConfigurableOptionsFactory
  private var configuration: ReadableConfig = _
  private val COMPRESSION: ConfigOption[CompressionType] =
    ConfigOptions.key("state.backend.rocksdb.custom.compression")
      .enumType(classOf[CompressionType])
      .noDefaultValue()
      .withDescription(
        Description.builder()
          .text("Configures RocksDB compression")
          .linebreak()
          .text(
            "For more information, please refer to %s",
            link("https://github.com/facebook/rocksdb/wiki/Compression")
          )
          .build()
      )

  override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
    defaultFactory.configure(configuration)
    this.configuration = configuration
    this
  }

  override def createDBOptions(currentOptions: DBOptions, handlesToClose: java.util.Collection[AutoCloseable]): DBOptions = {
    defaultFactory.createDBOptions(currentOptions, handlesToClose)
  }

  override def createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: java.util.Collection[AutoCloseable]): ColumnFamilyOptions = {
    var updatedOptions = defaultFactory.createColumnOptions(currentOptions, handlesToClose)

    configuration.getOptional(COMPRESSION).ifPresent(currentOptions.setCompressionType)
    updatedOptions
  }
}

and providing following configuration

........
state.backend.rocksdb.use-bloom-filter: "true"
state.backend.rocksdb.options-factory: 'com.mdsol.streaming.util.RocksDBTuningJobOptionsFactory'
state.backend.rocksdb.custom.compression: "LZ4_COMPRESSION"

Q#1: As per discussion here, LZ4 improves the performance. Just wondering what is that community uses for large state, high volume and velocity flink-sql(without temporal join) based application?

Q#2: In order to set Lz4 compression, Do we have to extends ConfigurableRocksDBOptionsFactory or is there any better way ?

Q#3: Is there any impact of changing the compression(Lz4) to existing job that is already running and have checkpoints

1

There are 1 best solutions below

1
On

From the Speedb Hive:

Q1 the default compression is Snappy. Q2 Flink question. Q3 RocksDB and Speedb allow changing compression type at any time. If Flink does not support this we at speedb created a backdoor to change any mutable option while running.

You can find the Speedb hive here and (once you've registered) the link to the thread with your question here, if you have more questions or need additional info