Special characters in Dataproc partition columns

470 Views Asked by At

I'm using Spark 3.1.2 on Goole Dataproc image version 2.0.15-debian10 with Dataproc managed metastore version 3.1.2. The following snippet works fine with a GCS backed table mydb.mytable:

from pyspark.sql import Row

spark.createDataFrame([Row(x='a', y=1)]).write.saveAsTable('mydb.mytable',
                       mode='overwrite',
                       partitionBy=['x'])

However, when adding a special character to partition column:

spark.createDataFrame([Row(x='ก', y=1)]).write.saveAsTable('mydb.mytable',
                       mode='overwrite',
                       partitionBy=['x'])

the operation fails with the following exception:

21/08/03 10:44:27 ERROR hive.ql.metadata.Hive: MetaException(message:Exception thrown when executing query : SELECT DISTINCT 'org.apache.hadoop.hive.metastore.model.MPartition' AS `NUCLEUS_TYPE`,`A0`.`CREATE_TIME`,`A0`.`LAST_ACCESS_TIME`,`A0`.`PART_NAME`,`A0`.`PART_ID` FROM `PARTITIONS` `A0` LEFT OUTER JOIN `TBLS` `B0` ON `A0`.`TBL_ID` = `B0`.`TBL_ID` LEFT OUTER JOIN `DBS` `C0` ON `B0`.`DB_ID` = `C0`.`DB_ID` WHERE `B0`.`TBL_NAME` = ? AND `C0`.`NAME` = ? AND `A0`.`PART_NAME` = ? AND `C0`.`CTLG_NAME` = ?)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$add_partitions_req_result$add_partitions_req_resultStandardScheme.read(ThriftHiveMetastore.java)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$add_partitions_req_result$add_partitions_req_resultStandardScheme.read(ThriftHiveMetastore.java)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$add_partitions_req_result.read(ThriftHiveMetastore.java)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_add_partitions_req(ThriftHiveMetastore.java:1911)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.add_partitions_req(ThriftHiveMetastore.java:1898)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.add_partitions(HiveMetaStoreClient.java:627)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
    at com.sun.proxy.$Proxy46.add_partitions(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2336)
    at com.sun.proxy.$Proxy46.add_partitions(Unknown Source)
    at org.apache.hadoop.hive.ql.metadata.Hive.createPartitions(Hive.java:2097)
    at org.apache.spark.sql.hive.client.Shim_v0_13.createPartitions(HiveShim.scala:555)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$createPartitions$1(HiveClientImpl.scala:609)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
    at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
    at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
    at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
    at org.apache.spark.sql.hive.client.HiveClientImpl.createPartitions(HiveClientImpl.scala:602)
    at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createPartitions$1(HiveExternalCatalog.scala:1007)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
    at org.apache.spark.sql.hive.HiveExternalCatalog.createPartitions(HiveExternalCatalog.scala:989)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createPartitions(ExternalCatalogWithListener.scala:201)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createPartitions(SessionCatalog.scala:1050)
    at org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand.$anonfun$addPartitions$1(ddl.scala:792)
    at org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand.$anonfun$addPartitions$1$adapted(ddl.scala:774)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand.addPartitions(ddl.scala:774)
    at org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand.run(ddl.scala:672)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:192)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:753)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:727)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:626)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

The data gets written correctly to GCS but updating the metastore fails. My guess is this is caused by using latin1 encoding in the managed metastore. Also, using spark.write.parquet(...) works (as the metastore is not used).

Is it possible to configure Dataproc to properly handle any UTF-8 values in partition columns? I would like to avoid using any encoding (like URL encoding) in the application logic.

1

There are 1 best solutions below

0
On

The issue is that Hive Metastore's MySQL schema does not support that character:

PART_NAME varchar(767) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,

https://github.com/apache/hive/blob/master/metastore/scripts/upgrade/mysql/hive-schema-2.3.0.mysql.sql#L211

You could reach out to the Hive community to check whether a larger character set is OK: https://github.com/apache/hive#useful-mailing-lists