getting the current timestamp of each row of dataframe using Spark / Java

1.2k Views Asked by At

I want to get the current timestamp of each row.

I use the following code

dataframe.withColumn("current_date",current_timestamp());

But current_timestamp() is evaluated prior to serialisation so I will always get same date.

How can I evaluate current_timestamp() for each row of dataframe.

I need your help.

Thank you.

2

There are 2 best solutions below

0
On BEST ANSWER

Try this -


    df2.withColumn("current_date", expr("reflect('java.lang.System', 'currentTimeMillis')"))
      .show(false)

    /**
      * +-----+------+-------------+
      * |class|gender|current_date |
      * +-----+------+-------------+
      * |1    |m     |1594137247247|
      * |1    |m     |1594137247247|
      * |1    |f     |1594137247247|
      * |2    |f     |1594137247272|
      * |2    |f     |1594137247272|
      * |3    |m     |1594137247272|
      * |3    |m     |1594137247272|
      * +-----+------+-------------+
      */

    df2.withColumn("current_date", expr("reflect('java.time.LocalDateTime', 'now')"))
      .show(false)

    /**
      * +-----+------+-----------------------+
      * |class|gender|current_date           |
      * +-----+------+-----------------------+
      * |1    |m     |2020-07-07T21:24:07.377|
      * |1    |m     |2020-07-07T21:24:07.378|
      * |1    |f     |2020-07-07T21:24:07.378|
      * |2    |f     |2020-07-07T21:24:07.398|
      * |2    |f     |2020-07-07T21:24:07.398|
      * |3    |m     |2020-07-07T21:24:07.398|
      * |3    |m     |2020-07-07T21:24:07.398|
      * +-----+------+-----------------------+
      */
// you can convert current_date to timestamp by casting it to "timestamp"
0
On

Even the direct python expressions are treated as serialization time constants, below code also give same time value for every row,

dataframe.withColumn("current_date", F.lit( time.time()))

But making an UDF for time value makes it resolve the time value in run time, as below,

from pyspark.sql.functions import udf

def get_time():
    return time.time()

time_udf=udf(get_time)

dataframe.withColumn("current_date", time_udf())

Hope this helps!!