Pyspark - withColumn use googletranslate

98 Views Asked by At

I have a name column with different languages. In the end, I need English.

When I use just one variable it works but how can I do this for the complete dataframe for 1 or more columns?

from deep_translator import GoogleTranslator
reader = "df"

NAME_ORIG = "ich suche den namen"
translated = GoogleTranslator(source='auto', target='en').translate(NAME_ORIG))
2

There are 2 best solutions below

0
werner On BEST ANSWER

Simple approach using UDF

As suggested here use an udf:

from pyspark.sql import functions as F
from pyspark.sql import types as T

spark = ...

df = spark.createDataFrame([
    ['ich suche den namen'],
    ['guten tag']],
    ['name'])

@F.udf(returnType=T.StringType())
def translate(input):
    from deep_translator import GoogleTranslator
    return GoogleTranslator(source='auto', target='en').translate(input)

df.withColumn('translation', translate(F.col('name'))).show()

Using mapPartitions

The udf approach creates a new GoogleTranslator object for each row.

The documentation of deep-translator says

You can also reuse the Translator class and change/update its properties.

(Notice that this is important for performance too, since instantiating new objects is expensive)

Reusing the translator object can be achieved by using mapPartitions. Only one translator object per partition will be created:

def translatePartition(rows):
    from deep_translator import GoogleTranslator
    dt  = GoogleTranslator(source='auto', target='en')
    for row in rows:
        yield (row['name'], dt.translate(row['name']))

df.rdd.mapPartitions(translatePartition).toDF(["name", "translation"]).show()

Using mapPartitions with batches

The deep-translator api offers a translate_batch function. This function can also be used by preparing the batches inside of mapPartitions:

BATCH_SIZE=5

def translatePartitionWithBatch(rows):
    from deep_translator import GoogleTranslator
    dt  = GoogleTranslator(source='auto', target='en')
    def translateBatch(batch):
        translations  = dt.translate_batch(batch)
        for text, translation in zip(batch, translations):
            yield (text, translation)
    batch = []
    for row in rows:
        batch.append(row['name'])
        if( len(batch) >= BATCH_SIZE):
            yield from translateBatch(batch)
            batch = []
    if( len(batch) > 0 ):
        yield from translateBatch(batch)

df.rdd.mapPartitions(translatePartitionWithBatch).toDF(["name", "translation"]).show()

Using GoogleTranslator.translate_batch instead of GoogleTranslator.translate may or may not improve the performance further.

In all three approaches the output is the same:

+-------------------+------------------------+
|name               |translation             |
+-------------------+------------------------+
|ich suche den namen|I'm looking for the name|
|guten tag          |Good day                |
+-------------------+------------------------+
0
user2704177 On

You can create a pyspark UDF around your Google translate function, and then call that UDF in your workbook withColumn.