Timestamp validation in pyspark

2k Views Asked by At

We are building a data ingestion framework in pyspark and trying to handle timestamp exception. Basically, want to have a reject record in separate column that does not confirm to the schema.

df = spark.createDataFrame(
    [
        ("1988-06-15 11:55:12.1","1"),
        ("1988-06-14 11:55:12", "3"),
        ("1988-06-13 11:55:12","1"),
        ("1988-06-12 11:55:12", "2")
    ],
    ['timestampColm','intColm']
)

Creating a new column into data frame called badRecords to capture all the errors that might be present in this dataframe and validating timestamp column with "yyyy-MM-dd HH:mm:dd" format.

Trying to validate timestamp with below code

Sample 1

df1 = df.withColumn("badRecords",
                f.when(
                        to_timestamp(f.col("timestampColm"), "yyyy-MM-dd HH:mm:ss").cast("Timestamp").isNull() & f.col("timestampColm").isNotNull(),f.lit("Not a valid Timestamp")
                       ).otherwise(f.lit(None))
              )

So it should mark first record "1988-06-15 11:55:12.1" as invalid as its not supporting "HH:mm:ss" format but it still validating a record and not rejecting it.

+--------------------+-----------+----------+
|       timestampColm|    intColm|badRecords|
+--------------------+-----------+----------+
|1988-06-15 11:55:...|          1|      null|
| 1988-06-14 11:55:12|       null|      null|
| 1988-06-13 11:55:12|          1|      null|
| 1988-06-12 11:55:12|          2|      null|
+--------------------+-----------+----------+

After few analysis found that we can do it with unix_timestamp but no luck

Sample 2

df1 = df.withColumn("badRecords",
                      f.when(
                            f.from_unixtime(
                                  f.unix_timestamp(
                                         f.col("timestampColm"),"yyyy-MM-dd HH:mm:ss")
                            ).cast("timestamp").isNull() & f.col("timestampColm").isNotNull(),
                            f.lit("Not a valid Timestamp")
                    ).otherwise(f.lit(None))
                )

Help me to understand what I am missing because of which it is still validating and not rejecting record?

2

There are 2 best solutions below

4
On

In your condition you have written & but it should be 'and' in order to do logical operation. '&' is a bitwise operator. May be in pyspark its considered as logical operator. Consider trying this one -: df1 = df.withColumn("badRecords", f.when( (to_timestamp(f.col("timestampColm"), "yyyy-MM-dd HH:mm:ss").cast("Timestamp").isNull()) & (f.col("timestampColm").isNotNull()),f.lit("Not a valid Timestamp") ).otherwise(f.lit(None)) )

I mean consider adding brackets and enclose each condition like (condition1) & (condition2). Hope this helps.

0
On

I am able to solve this by creating custom UDF and it is working fine.

validate_timestamp_udf = udf(lambda val: validate_timestamp(val))
df6 = df2.withColumn("badRecords",validate_timestamp_udf(f.col(ColName)))

And in validate_timestamp() function, I am doing format validation with the help of regex.