Cannot load user class: com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2

31 Views Asked by At

I have pyFlink script to load from Kafka to StarRocks database. Kafka connection is successful, however during calling sink data to Starocks receiving exception

Flink version 1.17, StarRocks 5.1.0
Receiving exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.

Code:

from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common.restart_strategy import RestartStrategies

def table_joins(t_env, e):
    try:
        cr_db = "create database test"
        t_env.execute_sql(cr_db)
        kfk_read = """
               CREATE TABLE test.test_kafka ( 
                `after` ROW(`ID` BIGINT)
                ) WITH ( 
                 'connector' = 'kafka',
                 'topic' = 'TEST',
                 'properties.bootstrap.servers' = 'test:49092,test:49092,test:49092',
                 'properties.group.id' = 'TEST',
                 'format' = 'json',
                 'scan.startup.mode' = 'earliest-offset'
                );
           """
        t_env.execute_sql(kfk_read)

        flink_to_str = """
                   CREATE TABLE test.test_sr (
            id bigint
            ) WITH (
                'connector' = 'starrocks',
                'jdbc-url' = 'jdbc:mysql://test:9030',
                'load-url' = 'test:8030',
                'database-name' = 'test',
                'table-name' = 'test_sr',
                'username' = 'user',
                'password' = 'pass',
                'sink.buffer-flush.interval-ms' = '1000',
                'sink.version' = 'V2',
                'sink.parallelism' = '1',
                'sink.properties.column_separator' = '\x01',
                'sink.properties.row_delimiter' = '\x02',
                'sink.properties.strict_mode' = 'true'
            );
               """
        t_env.execute_sql(flink_to_str)

        t_env.execute_sql("insert into test.test_sr select after.ID as id from test.test_kafka")


    except Exception as er:
        print(f"Job execution failed: {str(er)}")


if __name__ == '__main__':
    print('start')
    env = StreamExecutionEnvironment.get_execution_environment()

    env.set_parallelism(1)

    print('table_env')
    settings = EnvironmentSettings.new_instance() \
        .in_streaming_mode() \
        .build()

    table_env = StreamTableEnvironment.create(env, settings)
    table_env.get_config().set("pipeline.name", "test")

    table_joins(table_env, env)

jar files:

jar files

0

There are 0 best solutions below