I have a PySpark dataframe in which I need to add new column with unique id in row batches. For eg. I need to generate and assign unique id to first set of 100 rows and then so on for each batch of 100.
How can I get this done efficiently ?
I have a PySpark dataframe in which I need to add new column with unique id in row batches. For eg. I need to generate and assign unique id to first set of 100 rows and then so on for each batch of 100.
How can I get this done efficiently ?
On
As you have not shared the schema of your dataframe, I am assuming your df has an "id" column. You can update it accordingly to any other column as per your requirement.
You can simply apply the row_number() window function as follows to get the desired result:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
# 100 in the denominator is the batch size
df = df.withColumn(
"unique_batch_id",
((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
.cast("integer")
)
Update 1: As per your reply - you don't have an "id" like column so I have added a monotonically_increasing_id() to make the above code work properly and fulfil your requirement.
Update 2: So, as per the comments, you need uuid and not integers as the batch ID - therefore I came up with the following workaround - extending the previous code:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, monotonically_increasing_id, udf
import hashlib
import uuid
df = df.withColumn(
"batch_id",
((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
.cast("integer").cast("string")
)
def generate_uuid(batch_id):
return str(uuid.UUID(bytes=hashlib.md5(batch_id.encode()).digest()))
uuid_udf = udf(generate_uuid)
df = df.withColumn("uuid", uuid_udf(df["batch_id"])).drop("batch_id")
i hope i did understand well your question.
monotonically_increasing_id() will create a unique id for each row.