Why is observed false positive rate in Spark bloom filter higher than expected?

67 Views Asked by At

When creating a bloom filter out of a large number of elements (>400 million or so) with an fpp (false positive rate) of 1% in Spark, the observed false positive rate appears to be much higher, as much as 20%.

I do not believe this is a known limitation of bloom filters. At least I do not see anything like this in various descriptions of the bloom filter data structure. So is this a bug/limitation in Spark's implementation?

See my spark shell below for a demonstration:

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.0-amzn-0
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 17.0.10)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import java.security.MessageDigest
import java.security.MessageDigest

scala> import scala.util.Random
import scala.util.Random

scala> import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.util.sketch.BloomFilter

scala> 

scala> // Function to generate a random SHA1 hash

scala> def generateRandomSha1(): String = {
     |   val randomString = Random.alphanumeric.take(20).mkString
     |   val sha1 = MessageDigest.getInstance("SHA-1")
     |   sha1.update(randomString.getBytes("UTF-8"))
     |   val digest = sha1.digest
     |   digest.map("%02x".format(_)).mkString
     | }
generateRandomSha1: ()String

scala> 

scala> // Generate a DataFrame with 500 million rows of random SHA1 hashes

scala> val df = spark.range(500000000).map(_ => generateRandomSha1()).toDF("Hash")
df: org.apache.spark.sql.DataFrame = [Hash: string]

scala> // Create a bloom filter out of this collection of strings.

scala> val bloom_filter = df.stat.bloomFilter("Hash", 500000000, 0.01)
bloom_filter: org.apache.spark.util.sketch.BloomFilter = org.apache.spark.util.sketch.BloomFilterImpl@a14c0ba9

scala> // Generate another 10,000 random hashes

scala> val random_sha1s = List.fill(10000)(generateRandomSha1())
random_sha1s: List[String] = List(f3cbfd9bd836ea917ebc0dfc5330135cfde322a3, 4bff8d58799e517a1ba78236db9b52353dd39b56, 775bdd9d138a79eeae7308617f5c0d1d0e1c1697, abbd761b7768f3cbadbffc0c7947185856c4943d, 343692fe61c552f73ad6bc2d2d3072cc456da1db, faf4430055c528c9a00a46e9fae7dc25047ffaf3, 255b5d56c39bfba861647fff67704e6bc758d683, dae8e0910a368f034958ae232aa5f5285486a8ac, 3680dbd34437ca661592a7e4d39782c9c77fb4ba, f5b43f7a77c9d9ea28101a1848d8b1a1c0a65b82, 5bda825102026bc0da731dc84d56a499ccff0fe1, 158d7b3ce949422de421d5e110e3f6903af4f8e1, 2efcae5cb10273a0f5e89ae34fa3156238ab0555, 8d241012d42097f80f30e8ead227d75ab77086d2, 307495c98ae5f25026b91e60cf51d4f9f1ad7f4b, 8fc2f55563ab67d4ec87ff7b04a4a01e821814a3, b413572d14ee16c6c575ca3472adff62a8cbfa3d, 9219233b0e8afe57d7d5cb6...

scala> // Check how many of these random hashes return a positive result when passed into mightContain

scala> random_sha1s.map(c => bloom_filter.mightContain(c)).count(_ == true)
res0: Int = 2153

I would expect around 100 positive results here, i.e. 1%, but the actual rate is over 20% (2,153/10,000). With smaller bloom filters (around 100M records, for example) the rate is much closer to the expected value. Why is this the case? Does the accuracy of the filter deteriorate with scale?

1

There are 1 best solutions below

0
croncroncron On

I figured out what the issue is. This is appears to be a bug in Spark.

Below is the function that puts new values into the set:

  @Override
  public boolean putBinary(byte[] item) {
    int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
    int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);

    long bitSize = bits.bitSize();
    boolean bitsChanged = false;
    for (int i = 1; i <= numHashFunctions; i++) {
      int combinedHash = h1 + (i * h2);
      // Flip all the bits if it's negative (guaranteed positive number)
      if (combinedHash < 0) {
        combinedHash = ~combinedHash;
      }
      bitsChanged |= bits.set(combinedHash % bitSize);
    }
    return bitsChanged;
  }

The hashes used return an integer, and negative integers are flipped to positive, which means that in effect all hashed values will be integers between 0 and Integer.MAX_VALUE (~2.14 billion). This means that even if the total number of bits in the bit array is greater than this number, only the first ~2.14 billion will ever be set. This cap on the usable bits results in increasingly degraded performance for Bloom filters where m is significantly larger (which will be the case when n is greater than around 250 million).

This Bloom filter implementation was forked from Guava several years ago. Since then, the bug was noticed in Guava (in this issue: https://github.com/google/guava/issues/1119) and the Guava version no longer exhibits this behavior. It looks like the fix was never ported to Spark.

I created a ticket with Apache: https://issues.apache.org/jira/browse/SPARK-47547