I have pyFlink script to load from Kafka to StarRocks database. Kafka connection is successful, however during calling sink data to Starocks receiving exception
Flink version 1.17, StarRocks 5.1.0
Receiving exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
Code:
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common.restart_strategy import RestartStrategies
def table_joins(t_env, e):
try:
cr_db = "create database test"
t_env.execute_sql(cr_db)
kfk_read = """
CREATE TABLE test.test_kafka (
`after` ROW(`ID` BIGINT)
) WITH (
'connector' = 'kafka',
'topic' = 'TEST',
'properties.bootstrap.servers' = 'test:49092,test:49092,test:49092',
'properties.group.id' = 'TEST',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
"""
t_env.execute_sql(kfk_read)
flink_to_str = """
CREATE TABLE test.test_sr (
id bigint
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://test:9030',
'load-url' = 'test:8030',
'database-name' = 'test',
'table-name' = 'test_sr',
'username' = 'user',
'password' = 'pass',
'sink.buffer-flush.interval-ms' = '1000',
'sink.version' = 'V2',
'sink.parallelism' = '1',
'sink.properties.column_separator' = '\x01',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.strict_mode' = 'true'
);
"""
t_env.execute_sql(flink_to_str)
t_env.execute_sql("insert into test.test_sr select after.ID as id from test.test_kafka")
except Exception as er:
print(f"Job execution failed: {str(er)}")
if __name__ == '__main__':
print('start')
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
print('table_env')
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
table_env = StreamTableEnvironment.create(env, settings)
table_env.get_config().set("pipeline.name", "test")
table_joins(table_env, env)
jar files:
