Convert clojure vector to flambo sql row

524 Views Asked by At

I'm working on developing a function to convert a vector into an sql row to further convert it to a data-frame and save it into table using SQLcontext in Apache spark. I'm developing in clojure and I got lost along the way. I thought of implementing the solution thus:

  1. For each rdd (vector) convert it to rows
  2. Convert the rows to a data frame
  3. Save data frame to a table
  4. use the sqlContext to query for particular information in the table
  5. and how to convert the result from query into into RDD again for further analysis.

    (defn assign-ecom 
      []
       (let [rdd-fields (-> (:rdd @transformed-rdd)
                     (f/map #(sql/row->vec %))
                      f/collect)]
         (clojure.pprint/pprint rdd-fields)))
    

I'm using flambo v0.60 api function for abstracting Apache-spark functions, I also welcome any suggestion as to how to go about solving the problem. Thanks

Here's the link to Flambo row -> vec docs:

Flambo documentation:

1

There are 1 best solutions below

1
On BEST ANSWER

I assume you already have spark-context (sc) and sql-context (sql-ctx). First lets import all the stuff we'll need:

(import org.apache.spark.sql.RowFactory)
(import org.apache.spark.sql.types.StructType)
(import org.apache.spark.sql.types.StructField)
(import org.apache.spark.sql.types.Metadata)
(import org.apache.spark.sql.types.DataTypes)
  1. For each rdd (vector) convert it to rows

    ;; Vector to Row conversion
    (defn vec->row [v] 
      (RowFactory/create (into-array Object v)))
    
    ;; Example data
    (def rows (-> (f/parallelize sc [["foo" 1] ["bar" 2]])
                  (f/map vec->row)))
    
  2. Convert the rows to a data frame

    ;; Define schema
    (def schema
      (StructType.
       (into-array StructField
         [(StructField. "k" (DataTypes/StringType) false (Metadata/empty))
          (StructField. "v" (DataTypes/IntegerType) false (Metadata/empty))])))
    
    ;; Create data frame
    (def df (.createDataFrame sql-ctx rows schema))
    
    ;; See if it works
    (.show df)
    
  3. Save data frame to a table

    (.registerTempTable df "df")
    
  4. use the sqlContext to query for particular information in the table

    (def df-keys (.sql sql-ctx "SELECT UPPER(k) as k FROM df"))
    ;; Check results
    (.show df-keys)
    
  5. and how to convert the result from query into into RDD again for further analysis.

    (.toJavaRDD df-keys)
    

    or if you want vectors:

    (f/map (.toJavaRDD df-keys) sql/row->vec)