zipWithUniqueId() in flambo using clojure

213 Views Asked by At

I want to create a rdd such that each row has an index. I tried the following

Given an rdd:

["a" "b" "c"] 

(defn make-row-index [input]
  (let [{:keys [col]} input]
    (swap! @rdd assoc :rdd (-> (:rdd xctx)
                          (f/map #(vector %1 %2 ) (range))))))

Desired output:

 (["a" 0] ["b" 1] ["c" 2])

I got an arity error, since f/map is used as (f/map rdd fn) Wanted to use zipWithUniqueId() in apache spark but I'm lost on how to implement this and I cant find equivalent function in flambo. Any suggestion and help is appreciated.

Apache-spark zip with Index

Map implementation in flambo

Thanks

1

There are 1 best solutions below

2
On BEST ANSWER

You can simply call zipWithIndex followed by map using untuple:

(def rdd (f/parallelize sc ["a" "b" "c"]))
(f/map (.zipWithIndex rdd) f/untuple)

You can use .zipWithUniqueId exactly the same way but result will be different from what you expect. zipWithUniqueId will generate pairs but index field won't be ordered.

It should be also possible to use zip with, but as far as I can tell it doesn't work with infinite range.

(def idx (f/parallelize sc (range (f/count rdd))))
(f/map (.zip rdd idx) f/untuple)

Whenever you use zip you should be careful though Generally speaking RDD should be considered as an unordered collection if there is a shuffling involved.