Is there any way to assign unique incremental indexes to values in dimension table with pyFLINK?

27 Views Asked by At

I have a source table with a column containing duplicate values that need to be inserted into the following Postgres JDBC sink table, and incremental indexes should be assigned to each of them:

env_settings = EnvironmentSettings.in_streaming_mode() #unbounded
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql('''
CREATE TABLE dim_table (
index INT,
some_column_with_duplicates STRING,
PRIMARY KEY (index) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://...'  
);
'''
)

I'm executing the next query:

    table_env.execute_sql('''
        INSERT INTO dim_table  (index, some_column_with_duplicates)
        SELECT 
            ROW_NUMBER() OVER (ORDER BY some_column_with_duplicates) as index,
            some_column_with_duplicates
        FROM (
            SELECT DISTINCT some_column_with_duplicates
            FROM source_table
        )    
    '''
    )
But encountering the error:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate doesn't support consuming update changes
0

There are 0 best solutions below