I have two datasets dfA (5M) and dfB (6K). I train the LSH on spark 2.2:
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val brp_model = brp.fit(dfA)
val dfA_transformed = brp_model.transform(dfA)
val dfB_transformed = brp_model.transform(dfB)
I track a pair of records having distance of 17.59:
val uidB = 601295446577L
val key_s = dfB_transformed
.filter(col("uid") === lit(uidB))
.select("features")
.as[String].collect()(0)
val key = Vectors.parse(key_s).asML
brp_model
.approxNearestNeighbors(dfA_transformed, key, 2, "distance")
.drop("features")
.show(5,false)
+-------------+-----------------------+------------------+
|uid |hashes |distance |
+-------------+-----------------------+------------------+
|1194000912899|[[0.0], [-1.0], [-1.0]]|17.592033410506907|
|163208761881 |[[0.0], [-1.0], [-1.0]]|19.912395647390348|
+-------------+-----------------------+------------------+
So far, so good. But when I use approxSimilarityJoin I don't get my pair of 17.59 but rather others from A with much larger distances.
.approxSimilarityJoin(dfA_transformed, dfB_transformed, 25.0, "distance")
.persist(MEMORY_AND_DISK)
.filter(col("datasetB.uid") === lit(uidB))
+-------------+------------+------------------+
| uidA| uidB| distance|
+-------------+------------+------------------+
| 128849023798|601295446577|20.977834057053013|
|1005022360587|601295446577|21.919213729270727|
| 463856471960|601295446577|22.595725081515273|
| 670014905945|601295446577|23.396613579631136|
|1262720389581|601295446577| 24.03850371925476|
|1073741843710|601295446577|24.095447353196946|
+-------------+------------+------------------+
More precisely, if I use persist I never get that pair, while as without persist I always see it present.
Each run the resulting dataset shows heavy fluctuations in size:
brp_model
.approxSimilarityJoin(dfA_transformed, dfB_transformed, 25.0, "distance")
.persist(MEMORY_AND_DISK)
.count()
// 20741736,18820380,20772153
brp_model
.approxSimilarityJoin(dfA_transformed, dfB_transformed, 25.0, "distance")
.count()
// 19371911,17323851,20074502
I understand the random nature of hash collisions, nevertheless:
- Any wrong using persist here?
- What might be done to improve the accuracy of approxSimilarityJoin? I see no tools to evaluate how good the BucketLength is.
- If I know A and B ahead should I train brp on union of A and B?
- Is it a more stable way to iterate over B and query each record by key?