How to save query results into a Hive table, in Dataproc cluster, using `.saveAsTable()`?

598 Views Asked by At

I have a query results I am trying to write into a Hive table which is on GCP, and pointing to GCS bucket path, but when I execute the saveAsTable() method, its failing with following error .

org.apache.spark.SparkException: Job 
 aborted.org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)

Here is my code:

  sparkSession.sql(eiSqlQuery)
      .repartition("col_1")
      .write
      .mode(if(AppConfig.isHistoryLoad) Overwrite else Append)
      .partitionBy("col_2")
      .saveAsTable("hive_schema.hive_table_name")

I have also tried to pass the parquet path instead of saveAsTable and also create the table first and tried insertInto as well, all failing with the following error.

What are my other options here?

1

There are 1 best solutions below

0
On

First, you need a SparkSession with Hive enabled, for example:

// Scala
val spark = SparkSession
    .builder()
    .appName("Spark Hive Example")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .enableHiveSupport()
    .getOrCreate()

then you should be able to create Hive tables, for example (note the USING hive part in the SQL):

// Scala
sql("CREATE TABLE IF NOT EXISTS my_table (key INT, value STRING) USING hive")

then you can save dataframe into the table:

// Scala
df.write.partitionBy("key").format("hive").saveAsTable("my_table")

see Spark Hive Tables and the example code.