How to control parallelism / concurrency for main() in PySpark job?

43 Views Asked by At

I have a PySpark job defined in a main() function.

It reads in a DataFrame from HDFS, and for every row, it calls the OpenAI API, feeding row['text'] as input and saving the response into a new column 'response'.

df = spark.read.format('avro').load(data_path)
for row in df.toLocalIterator():
    result = chain.run(row['text'])

Now I essentially need 10 clients to call the OpenAI API. With that being said, I need the main() function to be executed in 10 workers/executors/processes/threads/etc. simultaneously.

(The PySpark job is hosted by Azkaban, i.e., it's executed inside an Azkaban flow.)

What is the best solution of achieving this? Appreciate any help :)

1

There are 1 best solutions below

0
boyangeor On

df.toLocalIterator returns the rows to the Driver, so even if you have N executors/slots rows are not processed by them. My suggestion is to wrap your call to OpenAI in a UDF. Then make sure your input DataFrame df has at least 10 partitions so that each slot can process a partition.

from pyspark.sql import functions as F, types as T

@F.udf(returnType=T.StringType())
def call_openai_udf(text: str) -> str:
    result = call_openai(text)  # implement this
    return result


df = spark.read.format('avro').load(data_path)
df = df.repartition(10)  # if necessary
df = df.withColumn('result', call_openai_udf('text'))