I am using flink 1.17.1 and running flink-sql application something like
create table enrich_table (
col1 STRING,
col2....
)
WITH ( 'connector' = 'upsert-kafka',
'topic' = 'enrich_table',
'properties.bootstrap.servers' = '${KAFKA_BROKERS}',
'properties.group.id' = '${CONSUMER_GROUP_ID}',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}',
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}');
Insert into enrich_table as
select
col1,col2...
from
table1 t1
inner join table2 t2 on ..
inner join table3 t2 on ...
.....
Setting LZ4 compression to rocksdb by creating custom factory
package my.org.abc
import org.rocksdb.{ColumnFamilyOptions, DBOptions}
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, DefaultConfigurableOptionsFactory, RocksDBOptionsFactory}
import org.apache.flink.configuration.ConfigOptions
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.configuration.description.LinkElement.link
import org.rocksdb.CompressionType
import org.apache.flink.configuration.description.Description
class RocksDBTuningJobOptionsFactory extends ConfigurableRocksDBOptionsFactory {
private val defaultFactory = new DefaultConfigurableOptionsFactory
private var configuration: ReadableConfig = _
private val COMPRESSION: ConfigOption[CompressionType] =
ConfigOptions.key("state.backend.rocksdb.custom.compression")
.enumType(classOf[CompressionType])
.noDefaultValue()
.withDescription(
Description.builder()
.text("Configures RocksDB compression")
.linebreak()
.text(
"For more information, please refer to %s",
link("https://github.com/facebook/rocksdb/wiki/Compression")
)
.build()
)
override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
defaultFactory.configure(configuration)
this.configuration = configuration
this
}
override def createDBOptions(currentOptions: DBOptions, handlesToClose: java.util.Collection[AutoCloseable]): DBOptions = {
defaultFactory.createDBOptions(currentOptions, handlesToClose)
}
override def createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: java.util.Collection[AutoCloseable]): ColumnFamilyOptions = {
var updatedOptions = defaultFactory.createColumnOptions(currentOptions, handlesToClose)
configuration.getOptional(COMPRESSION).ifPresent(currentOptions.setCompressionType)
updatedOptions
}
}
and providing following configuration
........
state.backend.rocksdb.use-bloom-filter: "true"
state.backend.rocksdb.options-factory: 'com.mdsol.streaming.util.RocksDBTuningJobOptionsFactory'
state.backend.rocksdb.custom.compression: "LZ4_COMPRESSION"
Q#1: As per discussion here, LZ4 improves the performance. Just wondering what is that community uses for large state, high volume and velocity flink-sql(without temporal join) based application?
Q#2: In order to set Lz4 compression, Do we have to extends ConfigurableRocksDBOptionsFactory or is there any better way ?
Q#3: Is there any impact of changing the compression(Lz4) to existing job that is already running and have checkpoints
From the Speedb Hive:
Q1 the default compression is Snappy. Q2 Flink question. Q3 RocksDB and Speedb allow changing compression type at any time. If Flink does not support this we at speedb created a backdoor to change any mutable option while running.
You can find the Speedb hive here and (once you've registered) the link to the thread with your question here, if you have more questions or need additional info