ReduceByKey with a byte array as the key

1.6k Views Asked by At

I would like to work with RDD pairs of Tuple2<byte[], obj>, but byte[]s with the same contents are considered as different values because their reference values are different.

I didn't see any to pass in a custom comparer. I could convert the byte[] into a String with an explicit charset, but I'm wondering if there's a more efficient way.

2

There are 2 best solutions below

2
maasg On BEST ANSWER

Custom comparers are insufficient because Spark uses the hashCode of the objects to organize keys in partitions. (At least the HashPartitioner will do that, you could provide a custom partitioner that can deal with arrays)

Wrapping the array to provide proper equals and hashCode should address the issue. A lightweight wrapper should do the trick:

class SerByteArr(val bytes: Array[Byte]) extends Serializable {
    override val hashCode = bytes.deep.hashCode
    override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}

A quick test:

import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.

// let's use the wrapper instead   

val keyable = rdd.map(elem =>  new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100
0
Holden On

You could create a wrapper class and defined your own equality / comparison functions. This is likely slightly faster since you don't have to do a copy of the array (although you still have an object allocation).