Starting from this example, I used a Locality-Sensitive Hashing (LSH) on Pyspark in order to find duplicated documents.
Some notes about my DB: I have 4M text files. Each file on average has 20K chars. Currently, I’m considering only the first 500chars of each doc.
When I increase the number of chars from 500 to 1000 I get memory errors.
I’ve tried working on the parameters of the pipeline. I know i can avoid memory error increasing n in Ngram and decreasing NumHashTables in MinHashLSH. However, this increases false negatives too much.
Are there any other steps in the pipeline that could improve performances?
My aim is to increase the number of chars from 500 to 2000 without having memory error or very long computational time (ideally, time computation < 6h).
This is my code with fake data:
# Prameters
# NGram
n_gram = 2 #really, i use n_gram=8 because i have 500char per each document
# MinHashLSH
hash_tables = 10 #really, i use hash_tables=3 to avoid memory error and too long computational time
# jaccard treshold
treshold_test = 0.5
#Fake dataframe
df = spark.createDataFrame([
(0, "Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
(1, "I wish Java could use case classes I wish Java could use case classes!!"),
(2, "Logistic, regression, models, are, neat, etc, etc, etc, etc, etc, etc, etc, etc"),
(3, "Hi I heard about Spork Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
(4, "Hi I heard about Java Hi I heard about Java Hi I heard about Java Hi I heard about Java")
], ["id", "text"])
# cleaning puntuactions and double spaces
df = df.withColumn("text", regexp_replace('text', r'\p{Punct}', ''))
df = df.withColumn("text", regexp_replace('text', r' (?= |$)', ''))
#trim whitespaces and filtering out text too short
df = df.withColumn("text", trim(col("text")))\
.filter((col('text') != "") & (length(col('text')) > n_gram*3))
df.show(5,False)
# LSH pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH
db = df
query = df
model = Pipeline(stages=[
RegexTokenizer(
pattern="", inputCol="text", outputCol="tokens", minTokenLength=1
),
NGram(n=n_gram, inputCol="tokens", outputCol="ngrams"),
HashingTF(inputCol="ngrams", outputCol="vectors"),
MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=hash_tables)]).fit(db)
db_hashed = model.transform(db)
query_hashed = model.transform(query)
output = model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, treshold_test)
# similar pairs of documents:
output.filter(col('datasetA.id') != col('datasetB.id'))\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("datasetA.text").alias("textA"),
col("datasetB.text").alias("textB"),
col("distCol")).sort(col("distCol"))\
.withColumn('comb', sort_array(array(*('idA', 'idB')))).dropDuplicates(['comb']).show()
I would consider adding an interim step.
If your searching for duplicates, you can actually remove any NGrams that are unique. This does require an interim step, of passing over the data more than once but it does help to reduce the amount of interim data. Here's a good paper speaking to the benefits.
I'd suggest doing this after you tokenize and before you has so that your working with the smallest dataset possible. Clearly it's been some time since you solved this did you find a better answer elsewhere?
You can see that it makes a difference in the amount of data that you will end up processing:
You can then roll the data back up:
Finally, if you aren't going to use the output I suggest dropping all the columns you aren't using as they still take up space.
You might be able to further juice the performance a little. I looked at the plan and its running your regex in a UDF. You might be able to tweak the performance a litte using mapPartitions instead. This way you could keep the Regex Cached. But spark does funny tricks so it could be doing that already. Worth a shot if you want to decrease memory pressure.