Flink with Iceberg Catalog and Hive Metastore: org.apache.hadoop.fs.s3a.S3AFileSystem not found

439 Views Asked by At

I'm trying to set up Flink SQL with the Apache Iceberg catalog and Hive Metastore, but having no luck. Below are the steps I've taken on a clean Flink 1.18.1 installation, and the resulting error that I get.

Set up components

Run Hive MetaStore:

docker run --rm --detach --name hms-standalone \
           --publish 9083:9083 \
           ghcr.io/recap-build/hive-metastore-standalone:latest 

Run MinIO using Docker:

docker run --rm --detach --name minio \
            -p 9001:9001 -p 9000:9000 \
            -e "MINIO_ROOT_USER=admin" \
            -e "MINIO_ROOT_PASSWORD=password" \
            minio/minio server /data --console-address ":9001"

Provision a bucket:

docker exec minio \
    mc config host add minio http://localhost:9000 admin password
docker exec minio \
    mc mb minio/warehouse

Add the required MinIO configuration to ./conf/flink-conf.yaml:

cat >> ./conf/flink-conf.yaml <<EOF
fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true
EOF

Add JARs to Flink

Flink's S3 plugin:

mkdir ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Flink's Hive connector:

mkdir -p ./lib/hive
curl -s https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar -o ./lib/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar

Dependencies for Iceberg:

mkdir ./lib/iceberg
curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar -o ./lib/iceberg/iceberg-flink-runtime-1.17-1.4.3.jar

mkdir -p ./lib/aws
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar -o ./lib/aws/hadoop-aws-3.3.6.jar

Run it

Set the Hadoop dependency:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.2/bin/hadoop classpath)

Launch SQL Client:

./bin/sql-client.sh
Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
>    'type' = 'iceberg',
>    'client.assume-role.region' = 'us-east-1',
>    'warehouse' = 's3a://warehouse',
>    's3.endpoint' = 'http://localhost:9000',
>    's3.path-style-access' = 'true',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)

Flink SQL>

Full stacktrace

Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation b685c995-3280-4a9e-b6c0-18ab9369d790.                                       │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)                                                          │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)                                                              │
│       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                │
│       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                               │
│       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                │
│       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                               │
│       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                        │
│       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                                                        │
│       ... 1 more                                                                                                                                                                          │
│Caused by: org.apache.flink.table.api.TableException: Could not execute CREATE DATABASE: (catalogDatabase: [{}], catalogName: [c_iceberg_hive], databaseName: [db_rmoff], ignoreIfExists: [│
│       at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:90)                                                                           │
│       at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1092)                                                                         │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:556)                                                                     │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:444)                                                                  │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:207)                                                                  │
│       at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)                                                           │
│       at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)                                                            │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)                                                              │
│       ... 7 more                                                                                                                                                                          │
│Caused by: java.lang.RuntimeException: Failed to create namespace db_rmoff in Hive Metastore                                                                                               │
│       at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)                                                                                                        │
│       at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:222)                                                                                                      │
│       at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:213)                                                                                                      │
│       at org.apache.flink.table.catalog.CatalogManager.createDatabase(CatalogManager.java:1381)                                                                                           │
│       at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:84)                                                                           │
│       ... 14 more                                                                                                                                                                         │
│Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)                                     │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)                                                             │
│       at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)                                                                                                             │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)                                                              │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)                                                                   │
│       at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)                                                                                │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                                   │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                 │
│       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                         │
│       at java.base/java.lang.reflect.Method.invoke(Method.java:566)                                                                                                                       │
│       at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)                                                                                │
│       at com.sun.proxy.$Proxy35.createDatabase(Unknown Source)                                                                                                                            │
│       at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)                                                                                               │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)                                                                                                                    │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)                                                                                                                    │
│       at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)                                                                                                          │
│       at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)                                                                                                        │
│       ... 18 more

Diagnostics

Verify that hadoop-aws is on the Classpath:

❯ ps -ef|grep sql-client|grep hadoop-aws
  501 51499 45632   0  7:38pm ttys007    0:06.81 /Users/rmoff/.sdkman/candidates/java/current/bin/java -XX:+IgnoreUnrecognizedVMOptions --add-exports=java.base/sun.net.util=ALL-UNNAMED --ad
d-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=
jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exp
orts=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-
opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens
=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNN
AMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -Dlog.file=/Users/rmoff/flink/flink-1.18.1/log/flink-rmoff-sql-client-asgard08.log -Dlog4j.configuration=file:/Users/rmoff/
flink/flink-1.18.1/conf/log4j-cli.properties -Dlog4j.configurationFile=file:/Users/rmoff/flink/flink-1.18.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/Users/rmoff/flink/fli
nk-1.18.1/conf/logback.xml -classpath /Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.6.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/l[…]

Confirm that the JAR holds the S3AFileSystem class:

❯ jar tvf lib/aws/hadoop-aws-3.3.6.jar|grep -i filesystem.class
157923 Sun Jun 18 08:56:00 BST 2023 org/apache/hadoop/fs/s3a/S3AFileSystem.class
  3821 Sun Jun 18 08:56:02 BST 2023 org/apache/hadoop/fs/s3native/NativeS3FileSystem.class

I get the same error if I strip the CREATE CATALOG back to bare-bones too:

Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
>    'type' = 'iceberg',
>    'warehouse' = 's3a://warehouse',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive2;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)

Java version:

❯ java --version
openjdk 11.0.21 2023-10-17
OpenJDK Runtime Environment Temurin-11.0.21+9 (build 11.0.21+9)
OpenJDK 64-Bit Server VM Temurin-11.0.21+9 (build 11.0.21+9, mixed mode)

Edit 01

Other things that I've tried:

  1. Using Flink 1.17.1 (to align with the 1.17 version in the Iceberg jar)
  2. Using Hadoop 3.3.4 components throughout
  3. Moving jars into ./lib instead of subfolders
  4. Removing the Flink s3-fs-hadoop plugin
  5. Add iceberg-aws-bundle-1.4.3.jar and aws-java-sdk-bundle-1.12.648.jar (separately, and together)
  6. Using the same setup to write to S3 (MinIO) with parquet format, which works fine.

More diagnostics:

If I add the three SQL statements (CREATE CATALOG / USE CATALOG / CREATE DATABASE) to a file and launch SQL Client with verbose class logging:

JVM_ARGS=-verbose:class ./bin/sql-client.sh -f ../iceberg.sql > iceberg.log

I get this output, showing that the hadoop-aws JAR just isn't picked up, even though it's in the classpath.

If I add Flink's s3-fs-hadoop back in we can see it being picked up (log), but still get the same failure.


Edit 02

If I switch from s3a to s3 I get a different error ¯\_(ツ)_/¯

Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
>     'type' = 'iceberg',
>     'client.assume-role.region' = 'us-east-1',
>     'warehouse' = 's3://warehouse',
>     's3.endpoint' = 'http://localhost:9000',
>     's3.path-style-access' = 'true',
>     'catalog-type'='hive',
>     'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:Got exception: org.apache.hadoop.fs.UnsupportedFileSystemException No FileSystem for scheme "s3")

If I add in io-impl I get yet another different error, which again seems (to my limited understanding) to suggest that hadoop-aws JAR isn't being picked up

Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
>    'type' = 'iceberg',
>    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
>    'client.assume-role.region' = 'us-east-1',
>    'warehouse' = 's3://warehouse',
>    's3.endpoint' = 'http://localhost:9000',
>    's3.path-style-access' = 'true',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: software.amazon.awssdk.services.s3.model.S3Exception
2

There are 2 best solutions below

0
Aleksandr Pilipenko On BEST ANSWER

The error you observe originates from Hive Metastore server, not from Flink:

│Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)                                     │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)                                                             │
│       at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)                                                                                                             │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)                                                              │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)                                                                   │
│       at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)                                                                                │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                                   │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                 │
│       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                         │
│       at java.base/java.lang.reflect.Method.invoke(Method.java:566)                                                                                                                       │
│       at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)                                                                                │
│       at com.sun.proxy.$Proxy35.createDatabase(Unknown Source)                                                                                                                            │
│       at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)                                                                                               │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)                                                        

This indicates that this error had been received from Hive Thrift API.

Docker image used here to run Hive does not include hadoop-aws - you need to add it yourself, or use another Hive image that contains required dependencies.

1
Alex Merced On

For me I was able to get it to work by making sure the AWS_ACCESS_KEY and AWS_SECRET_ACCESS_KEY environment variables were defined with my credentials.

Here is tutorial I wrote that shows an end-to-end using docker compose example to illustrate: https://www.dremio.com/blog/using-flink-with-apache-iceberg-and-nessie/