I have a source table with a column containing duplicate values that need to be inserted into the following Postgres JDBC sink table, and incremental indexes should be assigned to each of them:
env_settings = EnvironmentSettings.in_streaming_mode() #unbounded
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql('''
CREATE TABLE dim_table (
index INT,
some_column_with_duplicates STRING,
PRIMARY KEY (index) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://...'
);
'''
)
I'm executing the next query:
table_env.execute_sql('''
INSERT INTO dim_table (index, some_column_with_duplicates)
SELECT
ROW_NUMBER() OVER (ORDER BY some_column_with_duplicates) as index,
some_column_with_duplicates
FROM (
SELECT DISTINCT some_column_with_duplicates
FROM source_table
)
'''
)
But encountering the error:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate doesn't support consuming update changes