I have a problem when config the destination Apache Iceberg Connector in Airbyte. I set up the destination successfully, but when I try to sync data from other sources (MySQL, CSSV Files) to Apache Iceberg through this destination, Airbyte throws an error:
2024-01-24 03:51:23 destination > 2024-01-24 03:51:23 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-01-24 03:51:23 destination > 2024-01-24 03:51:23 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-01-24 03:51:26 destination > 2024-01-24 03:51:26 ERROR i.a.c.i.b.AirbyteExceptionHandler(uncaughtException):26 - Something went wrong in the connector. See the logs for more details.
2024-01-24 03:51:26 destination > java.lang.NoSuchMethodError: org.apache.logging.slf4j.Log4jLoggerFactory: method 'void <init>()' not found
2024-01-24 03:51:26 destination > at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:53) ~[log4j-slf4j-impl-2.17.2.jar:2.17.2]
2024-01-24 03:51:26 destination > at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:41) ~[log4j-slf4j-impl-2.17.2.jar:2.17.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j2(Logging.scala:232) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:129) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:115) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:109) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.SparkContext.initializeLogIfNecessary(SparkContext.scala:84) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:106) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.SparkContext.initializeLogIfNecessary(SparkContext.scala:84) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.log(Logging.scala:53) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.log$(Logging.scala:51) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.SparkContext.log(SparkContext.scala:84) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.logInfo(Logging.scala:61) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.internal.Logging.logInfo$(Logging.scala:60) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.SparkContext.logInfo(SparkContext.scala:84) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.SparkContext.<init>(SparkContext.scala:195) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2714) ~[spark-core_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:953) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at scala.Option.getOrElse(Option.scala:201) ~[scala-library-2.13.10.jar:?]
2024-01-24 03:51:26 destination > at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:947) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
2024-01-24 03:51:26 destination > at io.airbyte.integrations.destination.iceberg.IcebergDestination.getConsumer(IcebergDestination.java:85) ~[io.airbyte.airbyte-integrations.connectors-destination-iceberg-0.50.41.jar:?]
2024-01-24 03:51:26 destination > at io.airbyte.cdk.integrations.base.Destination.getSerializedMessageConsumer(Destination.java:54) ~[airbyte-cdk-core-0.2.0.jar:?]
2024-01-24 03:51:26 destination > at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:186) ~[airbyte-cdk-core-0.2.0.jar:?]
2024-01-24 03:51:26 destination > at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.java:125) ~[airbyte-cdk-core-0.2.0.jar:?]
2024-01-24 03:51:26 destination > at io.airbyte.integrations.destination.iceberg.IcebergDestination.main(IcebergDestination.java:42) ~[io.airbyte.airbyte-integrations.connectors-destination-iceberg-0.50.41.jar:?]
2024-01-24 03:51:26 platform > readFromDestination: exception caught
java.lang.IllegalStateException: Destination process is still alive, cannot retrieve exit value.
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteDestination.getExitValue(DefaultAirbyteDestination.java:191) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:477) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:228) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-01-24 03:51:26 platform > writeToDestination: exception caught
java.io.IOException: Broken pipe
at java.base/java.io.FileOutputStream.writeBytes(Native Method) ~[?:?]
at java.base/java.io.FileOutputStream.write(FileOutputStream.java:367) ~[?:?]
at java.base/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:217) ~[?:?]
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:206) ~[?:?]
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:309) ~[?:?]
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:405) ~[?:?]
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:410) ~[?:?]
at java.base/sun.nio.cs.StreamEncoder.lockedFlush(StreamEncoder.java:214) ~[?:?]
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:201) ~[?:?]
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:262) ~[?:?]
at java.base/java.io.BufferedWriter.implFlush(BufferedWriter.java:372) ~[?:?]
at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:359) ~[?:?]
at io.airbyte.workers.internal.DefaultAirbyteMessageBufferedWriter.flush(DefaultAirbyteMessageBufferedWriter.java:31) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInputWithNoTimeoutMonitor(DefaultAirbyteDestination.java:140) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:133) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:443) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:256) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-01-24 03:51:26 platform > readFromDestination: done. (writeToDestFailed:true, dest.isFinished:false)
2024-01-24 03:51:26 platform > writeToDestination: done. (forDest.isDone:false, isDestRunning:true)
2024-01-24 03:51:26 platform > processMessage: done. (fromSource.isDone:false, forDest.isClosed:true)
2024-01-24 03:51:26 platform > readFromSource: exception caught
java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value.
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteSource.getExitValue(DefaultAirbyteSource.java:127) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:363) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:235) ~[io.airbyte-airbyte-commons-worker-0.50.44.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
I deploy my lakehouse by docker, includes: minIO, REST Iceberg. Here is my docker-compose file:
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8081:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
volumes:
- ./iceberg-metadata:/tmp
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
volumes:
- ./minio-data:/data
ports:
- 9091:9001
- 9090:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
I've connected Iceberg by Trino and it works effectively. But when I try to connect through Airbyte, it always throws that error. Should I custom and change the source code of Airbyte. https://github.com/airbytehq/airbyte