We are building a data ingestion framework in pyspark and trying to handle timestamp exception. Basically, want to have a reject record in separate column that does not confirm to the schema.
df = spark.createDataFrame(
[
("1988-06-15 11:55:12.1","1"),
("1988-06-14 11:55:12", "3"),
("1988-06-13 11:55:12","1"),
("1988-06-12 11:55:12", "2")
],
['timestampColm','intColm']
)
Creating a new column into data frame called badRecords to capture all the errors that might be present in this dataframe and validating timestamp column with "yyyy-MM-dd HH:mm:dd" format.
Trying to validate timestamp with below code
Sample 1
df1 = df.withColumn("badRecords",
f.when(
to_timestamp(f.col("timestampColm"), "yyyy-MM-dd HH:mm:ss").cast("Timestamp").isNull() & f.col("timestampColm").isNotNull(),f.lit("Not a valid Timestamp")
).otherwise(f.lit(None))
)
So it should mark first record "1988-06-15 11:55:12.1" as invalid as its not supporting "HH:mm:ss" format but it still validating a record and not rejecting it.
+--------------------+-----------+----------+
| timestampColm| intColm|badRecords|
+--------------------+-----------+----------+
|1988-06-15 11:55:...| 1| null|
| 1988-06-14 11:55:12| null| null|
| 1988-06-13 11:55:12| 1| null|
| 1988-06-12 11:55:12| 2| null|
+--------------------+-----------+----------+
After few analysis found that we can do it with unix_timestamp but no luck
Sample 2
df1 = df.withColumn("badRecords",
f.when(
f.from_unixtime(
f.unix_timestamp(
f.col("timestampColm"),"yyyy-MM-dd HH:mm:ss")
).cast("timestamp").isNull() & f.col("timestampColm").isNotNull(),
f.lit("Not a valid Timestamp")
).otherwise(f.lit(None))
)
Help me to understand what I am missing because of which it is still validating and not rejecting record?
In your condition you have written & but it should be 'and' in order to do logical operation. '&' is a bitwise operator. May be in pyspark its considered as logical operator. Consider trying this one -: df1 = df.withColumn("badRecords", f.when( (to_timestamp(f.col("timestampColm"), "yyyy-MM-dd HH:mm:ss").cast("Timestamp").isNull()) & (f.col("timestampColm").isNotNull()),f.lit("Not a valid Timestamp") ).otherwise(f.lit(None)) )
I mean consider adding brackets and enclose each condition like (condition1) & (condition2). Hope this helps.