Spark sql in Scala incrementally extracts data from a MySQL database into Hudi

47 Views Asked by At

I need to write Scala code to use Spark to incrementally extract data from user_info tables in MySQL's shtd_store library into a user_info in Hudi's ODS library (path is /user/hive/warehouse/ods.db). operate_time or create_time in the ods.user_info table is used as the incremental field field (that is, the larger of these two times is used as the incremental field for each piece of data in MySQL to compare the larger time of the two fields in ods). If operate_time is empty, it is filled with create_time, the partition field is etl_date, the type is String, and the value is the date of the day (the format of the partition field is y-M-d). id as primaryKey and operate_time as preCombineField.I don't have a Hudi table, I don't know how to create a new Hudi table, I can extract the full amount first user_info and then extract the user_info incrementally

How should I write this code, please advise, it is best to produce the full code, I hope you can provide the full extraction and incremental extraction of the code thanks!

import org.apache.spark.sql.{SaveMode, SparkSession}

object MTHU {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("MySQL to Hudi")
      .getOrCreate()
    val jdbcDF = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.49.201:3306/shtd_store")
      .option("dbtable", "user_info")
      .option("user", "root")
      .option("password", "123456")
      .load()

    jdbcDF.write
      .format("org.apache.hudi")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.recordkey.field", "id")
      .option("hoodie.datasource.write.precombine.field", "operate_time")
//      .option("hoodie.datasource.write.partitionpath.field", "etl_date")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.table.name", "hudi_table")
//      .option("hoodie.datasource.write.hive_sync.enabled", "true")
//      .option("hoodie.datasource.write.hive_sync.table", "hive_table")
//      .option("hoodie.datasource.write.hive_sync.partition_fields", "date")
      .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      .mode(SaveMode.Append)
      .save("/user/hive/warehouse/ods_ds_hudi.db/user_info")


  }
}

This is my code, but it cannot be run. It reports an error that the data source cannot be found. Maybe there is no local table. This is my original question Write Scala code and use Spark to incrementally extract data from tables user_info, sku_info, base_province, base_region, order_info, order_detail in MySQL's shtd_store library to user_info in Hudi's ods_ds_hudi library (the path is /user/hive/warehouse/ods_ds_hudi.db) middle. (If there is no data in some tables in the ods_ds_hudi database, just extract it normally)

  1. Extract the incremental data of user_info in the shtd_store library into the table user_info in Hudi's ods_ds_hudi library. According to the operate_time or create_time in the ods_ds_hudi.user_info table as the incremental field (that is, for each piece of data in MySQL, the larger of the two times is used as the incremental field to be compared with the larger of the two fields in ods. Compare), only the newly added data is extracted, the field name and type remain unchanged, and a partition is added at the same time. If operate_time is empty, fill it with create_time. The partition field is etl_date, the type is String, and the value is 20231129. Date of the day (partition field format is yyyyMMdd). id is used as primaryKey and operate_time is used as preCombineField. The question is about incremental extraction. In fact, in my test environment, Hudi does not have underlying data. I need to extract all the underlying data into Hudi first.
0

There are 0 best solutions below