Kind of groupByKey on RDD[(K, V)] returning List[(K, RDD[V])]

49 Views Asked by At

I would split an RDD[(K, V)] into buckets such as the output type would be a List[(K, RDD[V])], here is my proposal. But i'm not satisfy because it rely on keysNumber run over the original RDD. Does it exist other way to process requiring less run over original RDD. If not, what do you think about the fact of put in cache rest before recursive call, sure it will be faster but doest Spark will minimise storage in memory because of lineage with first RDD or does it save ~keysNumber times smallest version of original RDD. Thank you.


def kindOfGroupByKey[K : ClassTag, V : ClassTag](rdd: RDD[(K, V)], keys: List[K] = List.empty[K]): List[(K, RDD[V])] = {

    val keysIn: List[K] = if (keys.isEmpty) rdd.map(_._1).distinct.collect.toList else keys

    @annotation.tailrec
    def go(rdd2: RDD[(K, V)], keys: List[K], output: List[(K, RDD[V])]): List[(K, RDD[V])] = {

        val currentKey :: keyxs = keys

        val filtered = rdd2.filter(_._1 == currentKey)
        val rest = rdd2.filter(_._1 != currentKey)

        val updatedOutput = (currentKey, filtered.map(_._2)) :: output

        if (keys.isEmpty) updatedOutput.reverse
        // Supposing rdd is cached, it is good to cache rest or does it will generate many smallest cached version of rdd which risk to overload ram ?
        else go(rest, keyxs, updatedOutput)

    }

    go(rdd, keysIn, List.empty[(K, RDD[V])])

}

0

There are 0 best solutions below