FlinkSQL Table Gone after Session Closed

188 Views Asked by At

I access to job manager session sqlclient.sh

I wanted to stream the kafka topic via flinksql so i execute the script below to create table, however when I exit the session, the table gone.


Flink SQL> CREATE TABLE en_trans (
>   `ID` INTEGER,
>   `purchaseId` INTEGER,
>    PRIMARY KEY (ID) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'en_trans',
>   'properties.bootstrap.servers' = 'kafka-netlex-cp-kafka:9092,....',
>   'properties.group.id' = 'en_group_test',
>   'key.format' = 'avro-confluent',
>   'value.format' = 'avro-confluent',
>   'key.avro-confluent.url' = 'http://kafka-sg-cp-schema-registry:8081',
>   'value.avro-confluent.url' = 'http://kafka-sg-cp-schema-registry:8081'
> );
Flink SQL> show tables;
+------------+
| table name |
+------------+
|   en_trans |
+------------+

After Session shut down

Flink SQL>

Shutting down the session...
done.

I go to sqlclient again

Flink SQL> show tables;

Empty set

Where is it goes wrong ??

the flink-conf.yaml as below


    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    state.backend: rocksdb
    state.backend.type: rocksdb
    state.backend.incremental: true
    sql-client.execution.result-mode: tableau
    # sql-client.execution.max-table-result.rows: 10000
    # table.exec.state.ttl: 1000
    execution.runtime-mode: batch
    # state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints
    # state.savepoints.dir: hdfs:///flink-savepoints
    state.backend.local-recovery: true
    process.taskmanager.working-dir: /pv
    parallelism.default: 2    

2

There are 2 best solutions below

4
On

The data belonging to the table is still there, it's just the table metadata that hasn't been preserved in a persistent catalog. If you recreate the table, you'll be right back where you were.

5
On

The SQL Client creates an in-memory one which will go away when you exit the client's session.

You will need to configure and use a durable catalog such as the JdbcCatalog or HiveCatalog. You can read more about catalogs here: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/.