Getting duplicates in the Table when an ETL job Is ruined twice.ETL job fetch data from RDS to S3 bucket

1.6k Views Asked by At

When the ETL job is run it execute properly but as the table is not having Timestamp it duplicate the data when the same ETL job is run.How to perform staging and solve this problem using Upsert or if any other you are welcome to answer.How do I get rid of this problem the solution I find is either include timestamp in it or doing staging or is there any other way?

2

There are 2 best solutions below

1
Yuriy Bondaruk On BEST ANSWER

To prevent duplicates on s3 you need to load data from destination and filter out existing records before saving:

val deltaDf = newDataDf.alias("new")
  .join(existingDf.alias("existing"), "id", "left_outer")
  .where(col("existing.id").isNull)
  .select("new.*")

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map(
      "path" -> path
    )),
    transformationContext = "save_to_s3"
    format = "avro"
  ).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))

However, this method doesn't overwrite updated records.

Another option is to save updated records too with some updated_at field which can be used by downstream consumers to get the latest values.

You can also consider dumping dataset into a separate folder each time you run your job (ie. every day you have a full dump of data in data/dataset_date=<year-month-day>)

import org.apache.spark.sql.functions._

val datedDf = sourceDf.withColumn("dataset_date", current_date())

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map(
      "path" -> path,
      "partitionKeys" -> Array("dataset_date")
    )),
    transformationContext = "save_to_s3"
    format = "avro"
  ).writeDynamicFrame(DynamicFrame(datedDf, glueContext))
1
Sandeep Fatangare On

U can use overwrite while writing data to s3. It will replace original data