I have some datasets (CSV files and Parquet files) which I want to transform and build as a Hudi Table in my S3 Bucket which has Object Lock enabled.
From the official documentation of PySpark I understand that it currently doesn't officially support Content-MD5 for S3 with Object Lock, but I am wondering if someone can help me with this since this might be a useful case for larger organisations where S3 buckets are always on Object Lock according to policy. Note: I do not have an option to create a bucket without Object Lock.
I have already written the following code:
from methods.fast_hudi import FastHudi
import os
from pyspark.sql.functions import concat_ws, md5
import zipfile import hashlib
with zipfile.ZipFile(r'./data/package.zip', 'r') as zip_ref: zip_ref.extractall(r'./data/package')
os.environ['AWS_ACCESS_KEY'] = '' os.environ['AWS_SECRET_ACCESS_KEY'] = '' os.environ['AWS_SESSION_TOKEN'] = ''
spark_config = { 'spark.jars.packages':'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.13.1,org.apache.spark:spark-avro_2.13:3.4.0,org.apache.calcite:calcite-core:1.34.0,com.amazonaws:aws-java-sdk-bundle:1.12.486', "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", 'spark.hadoop.fs.s3a.aws.credentials.provider':'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider', "spark.hadoop.fs.s3a.impl":"org.apache.hadoop.fs.s3a.S3AFileSystem", 'spark.hadoop.fs.s3a.access.key': os.getenv('AWS_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.secret.key': os.getenv('AWS_SECRET_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.session.token': os.getenv('AWS_SESSION_TOKEN', None), 'spark.hadoop.fs.s3a.aws.credentials.provider': "com.amazonaws.auth.profile.ProfileCredentialsProvider", 'spark.driver.memory': '16g', 'spark.hadoop.fs.s3a.fast.upload': 'true', 'spark.hadoop.fs.s3a.upload.buffer': 'bytebuffer' }
spark = FastHudi('Showroom Variants Bronze Layer - Hudi', spark_config) base_data = spark.read( 'csv', 'data/package/showroom_variant.csv' )
base_data.show()
hudi_table_name = 'showroom_variants'
hoodie_options = { 'hoodie.table.name': hudi_table_name, 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'showroom_variant_id', 'hoodie.datasource.write.precombine.field': 'global_brand_name', 'hoodie.datasource.write.partitionpath.field': 'global_sales_parent_name,global_brand_name,body_type', 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.table.name': hudi_table_name, 'hoodie.deltastreamer.source.dfs.listing.max.fileid': '-1', 'hoodie.datasource.write.hive_style_partitioning': 'true' }
spark.write_as_hudi( base_data, 'overwrite', 'org.apache.hudi', f's3a://<s3_bucket>/datalake/{hudi_table_name}', hoodie_options)
Here is are the classes used in the imports:
from pyspark.sql import SparkSession
from functools import reduce
class FastSpark:
def init(self, app_name: str, spark_config: dict = None) -> None:
self.spark_obj = None self.app_name = app_name self.spark_config = spark_config
def get_or_create_session(self) -> SparkSession:
if self.spark_config is None:
return SparkSession.builder \
.appName(self.app_name) \
.getOrCreate()
else:
spark_session = reduce(lambda x, y: x.config(y[0], y[1]),self.spark_config.items(),SparkSession.builder.appName(self.app_name))
return spark_session.getOrCreate()
def read(self, read_format: str, load_path: str, read_options: dict = None):
self.spark_obj = self.get_or_create_session()
if read_options is None:
return self.spark_obj.read.format(read_format) \
.load(load_path)
else:
read_obj = reduce(lambda x, y: x.option(y[0], y[1]), read_options.items(),
self.spark_obj.read.format(read_format))
return read_obj.load(load_path)
from methods.fast_spark import FastSpark
import os
from functools import reduce
class FastHudi(FastSpark):
def __init__(self, app_name: str, spark_config: dict = None) -> None:
super().__init__(app_name, spark_config)
self.AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', None)
self.AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', None)
self.spark = self.get_or_create_session()
if self.AWS_ACCESS_KEY is None or self.AWS_SECRET_ACCESS_KEY is None:
raise ValueError("Either of the following variables need to be set: AWS_ACCESS_KEY,AWS_SECRET_ACCESS_KEY")
def write_as_hudi(self, write_df, write_mode: str, write_format: str = 'org.apache.hudi', target_path: str = None,hoodie_options: dict = None, header=None):
write_df.write.format(write_format) \
.options(**hoodie_options) \
.mode(write_mode) \
.save(target_path, header=header)
def stop_spark(self):
self.spark.stop()
Library Version: pyspark == 3.4.0
When I am trying to run the above code, it throws this error:
py4j.protocol.Py4JJavaError: An error occurred while calling o121.save. : org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on datalake/showroom_variants: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <request_id>; Proxy: null), S3 Extended Request ID: <some_id>:InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <extended_request_id>; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:249)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:119)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:318)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:293)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:4532)
at org.apache.hadoop.fs.s3a.S3AFileSystem.access$1900(S3AFileSystem.java:259)
at org.apache.hadoop.fs.s3a.S3AFileSystem$MkdirOperationCallbacksImpl.createFakeDirectory(S3AFileSystem.java:3461)
at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:121)
at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:45)
at org.apache.hadoop.fs.s3a.impl.ExecutingStoreOperation.apply(ExecutingStoreOperation.java:76)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:3428)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2388)
at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:481)
at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:1201)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:323)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829) Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Erro\r Code: InvalidRequest; Request ID: DKTYYJHRJGJZFQ1X; S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI=; Proxy: null), S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5470)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5417)
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:422)
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6551)
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1862)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1822)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2877)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2874)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$32(S3AFileSystem.java:4534)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117) ... 63 more \
EDIT (2024-02-18):
I am open to workarounds for this or maybe considering a different approach/framework than Apache Hudi as well. The goal is to create efficient data lakes as I could have with Apache Hudi, but the Amazon S3 Object Lock seems to be a blocker in this case. Open to suggestions.
S3A codebase has never been tested on an object lock bucket, as far as I'm aware. File a HADOOP JIRA with the stack trace. Don't expect any immediate response, and anything that comes in will be for hadoop-3.4.x only... Which there is not yet any spark releases.
More likely: we'll use the error message in a documentation with "not currently supported"
Update: created the JIRA, https://issues.apache.org/jira/browse/HADOOP-19080 S3A to support writing to object lock buckets.