How to connect/use GCS buckets as a catalog and warehouse for Apache Iceberg via pyspark?

303 Views Asked by At

I'm completely new to iceberg via pyspark, and I'm having difficulties trying to connect to a gcs bucket in order to store my tables (and use a gcs bucket as a catalog)

I've been trying to connect to a GCS bucket using pyspark with apache iceberg as follows:

import pyspark
from pyspark.sql import SparkSession

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        #packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,org.apache.iceberg:iceberg-gcp-bundle:1.4.2')
        #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        #Configuring Catalog
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', 'gcs://iceberg_bucket_test/test/')
        .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
# Run queries
spark.sql("CREATE TABLE hdfs_catalog.our_table (first_name string, last_name string) USING iceberg").show()
spark.sql("INSERT INTO hdfs_catalog.our_table VALUES ('FirstName', 'LastName')").show()
spark.sql("SELECT * FROM hdfs_catalog.our_table").show()

I've authenticated my user account using gcloud auth application-default login

After executing the python script, I'm hit with the following error:

org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-gcp-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3e06da03-b21a-4d7e-9bce-fa87cd9dd8b4;1.0
        confs: [default]
        found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.4.2 in central
        found org.apache.iceberg#iceberg-gcp-bundle;1.4.2 in central
:: resolution report :: resolve 64ms :: artifacts dl 2ms
        :: modules in use:
        org.apache.iceberg#iceberg-gcp-bundle;1.4.2 from central in [default]
        org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.4.2 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3e06da03-b21a-4d7e-9bce-fa87cd9dd8b4
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/3ms)
23/11/24 08:35:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark Running
Traceback (most recent call last):
  File "/Users/username/iceberg.py", line 22, in <module>
    spark.sql("CREATE TABLE hdfs_catalog.our_table (first_name string, last_name string) USING iceberg").show()
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py", line 1631, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o39.sql.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: gcs://iceberg_bucket_test/test
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
        at org.apache.iceberg.hadoop.HadoopCatalog.initialize(HadoopCatalog.java:112)
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:239)
        at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:284)
        at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:143)
        at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:555)
        at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
        at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
        at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:122)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:36)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
        at org.apache.spark.sql.catalyst.plans.logical.CreateTable.mapChildren(v2Commands.scala:446)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:30)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:27)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gcs"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
        ... 84 more

The program errors out since it can't fine the filesystem

py4j.protocol.Py4JJavaError: An error occurred while calling o39.sql.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: gcs://iceberg_bucket_test/test

I've tried both gcs:// and gs:// prefixes. I'm guessing that I'm missing some additional jars that needed to be added/configured? The iceberg documentation is super thin on using GCS as a catalog and as a warehouse.

0

There are 0 best solutions below