I access to job manager session sqlclient.sh
I wanted to stream the kafka topic via flinksql so i execute the script below to create table, however when I exit the session, the table gone.
Flink SQL> CREATE TABLE en_trans (
> `ID` INTEGER,
> `purchaseId` INTEGER,
> PRIMARY KEY (ID) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'en_trans',
> 'properties.bootstrap.servers' = 'kafka-netlex-cp-kafka:9092,....',
> 'properties.group.id' = 'en_group_test',
> 'key.format' = 'avro-confluent',
> 'value.format' = 'avro-confluent',
> 'key.avro-confluent.url' = 'http://kafka-sg-cp-schema-registry:8081',
> 'value.avro-confluent.url' = 'http://kafka-sg-cp-schema-registry:8081'
> );
Flink SQL> show tables;
+------------+
| table name |
+------------+
| en_trans |
+------------+
After Session shut down
Flink SQL>
Shutting down the session...
done.
I go to sqlclient again
Flink SQL> show tables;
Empty set
Where is it goes wrong ??
the flink-conf.yaml as below
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
state.backend: rocksdb
state.backend.type: rocksdb
state.backend.incremental: true
sql-client.execution.result-mode: tableau
# sql-client.execution.max-table-result.rows: 10000
# table.exec.state.ttl: 1000
execution.runtime-mode: batch
# state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints
# state.savepoints.dir: hdfs:///flink-savepoints
state.backend.local-recovery: true
process.taskmanager.working-dir: /pv
parallelism.default: 2
The data belonging to the table is still there, it's just the table metadata that hasn't been preserved in a persistent catalog. If you recreate the table, you'll be right back where you were.