RDDs and vectors in clojure

269 Views Asked by At

Given a function to filter out dates that are greater than the maximum date determined from a subset of a given dataset (RDD) and hence use the maximum date determined to check if a given vector contains a date value greater than the maximum date determined I tried the following:

(defn future-rows
  "input = { :col val :qty-col val}
   :col = Date column reference"
   [ row input ]
   (let [{:keys [ col qty-col ]} input
     get-qty-max-date (-> (:rdd @scope-rdd)
                       (f/map #(if-not (or (s/blank? (get % qty-col)) (not (pos? (read-string (get % qty-col)))))
                                 (get % col) false))
                       (f/reduce #(if (pos? (compare % %2)) %1 %2)))]
(when-not (pos? (compare (get row col) get-qty-max-date)) row)))

Here row is a vector. The challenge I have is the get-qty-max-date is of type RDD. How do I make the comparison in the when-not form?

NB: The idea is the future-rows function is going to be used as a predicate

Given an RDD:

[[" " "2009/12/02"] ["4" "2005/02/08"] ["0" "2014/12/02"] ["5" "2005/08/01"] ["2" "2007/09/02"]]

When future-rows is used as a predicate, the desired output is:

[["4" "2005/02/08"] ["5" "2005/08/01"] ["2" "2007/09/02"]]

where input is input { :col 1 :qty-col 0 } for the above function the maximum-date determined is 2007/09/02. Hence dates 2009/12/02 and 2014/12/02 which are greater is removed from the data set.

If there's any other approach on how to go about doing this, I will appreciate it.

So say we have a main function that does this

(defn remove-rows [xctx input]
  (f/filter (:rdd xctx) #(future-rows row { :col 1 :qty-col 0 }))

will produce the desired output

Thanks!

1

There are 1 best solutions below

0
On BEST ANSWER

I guess you're looking for something like this:

(defn not-empty-and-positive?
  [qty-col]
  (f/fn
   [row]
   (let [x (get row qty-col)]
     (not (or (s/blank? x) (neg? (read-string x)))))))


(defn get-max-date
  [col qty-col]
    (-> (:rdd @scope-rdd)
        (f/filter (not-empty-and-positive? qty-col))
        (f/map (f/fn [row] (get row col)))
        (.top 1)
        (first)))


(defn is-past?
    [col qty-col]
    (let [max-date (get-max-date col qty-col)]
      (f/fn [row] (neg? (compare (get row col) max-date)))))


(let [{:keys [ col qty-col ]} input
      not-empty-and-positive? (not-empty-and-positive? qty-col)
      is-past? (is-past? col qty-col)]
  (-> (f/filter rdd not-empty-and-positive?) (f/filter is-past? ) (f/collect)))