I access to job manager session sqlclient.sh
I wanted to stream the kafka topic via flinksql so i execute the script below to create table, however when I exit the session, the table gone.
Flink SQL> CREATE TABLE en_trans (
> `ID` INTEGER,
> `purchaseId` INTEGER,
> PRIMARY KEY (ID) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'en_trans',
> 'properties.bootstrap.servers' = 'kafka-netlex-cp-kafka:9092,....',
> 'properties.group.id' = 'en_group_test',
> 'key.format' = 'avro-confluent',
> 'value.format' = 'avro-confluent',
> 'key.avro-confluent.url' = 'http://kafka-sg-cp-schema-registry:8081',
> 'value.avro-confluent.url' = 'http://kafka-sg-cp-schema-registry:8081'
> );
Flink SQL> show tables;
+------------+
| table name |
+------------+
| en_trans |
+------------+
After Session shut down
Flink SQL>
Shutting down the session...
done.
I go to sqlclient again
Flink SQL> show tables;
Empty set
Where is it goes wrong ??
the flink-conf.yaml as below
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
state.backend: rocksdb
state.backend.type: rocksdb
state.backend.incremental: true
sql-client.execution.result-mode: tableau
# sql-client.execution.max-table-result.rows: 10000
# table.exec.state.ttl: 1000
execution.runtime-mode: batch
# state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints
# state.savepoints.dir: hdfs:///flink-savepoints
state.backend.local-recovery: true
process.taskmanager.working-dir: /pv
parallelism.default: 2
The SQL Client creates an in-memory one which will go away when you exit the client's session.
You will need to configure and use a durable catalog such as the JdbcCatalog or HiveCatalog. You can read more about catalogs here: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/.