I am trying to rewrite Spark Structured Streaming example in Clojure.
The example is written in Scala as follows:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
(ns flambo-example.streaming-example
(:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
[org.apache.spark.sql.functions]
))
(def spark
(->
(SparkSession/builder)
(.appName "sample")
(.master "local[*]")
.getOrCreate)
)
(def lines
(-> spark
.readStream
(.format "socket")
(.option "host" "localhost")
(.option "port" 9999)
.load
)
)
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap #(clojure.string/split % #" " ))
))
The above code causes the following exception.
;; Caused by java.lang.IllegalArgumentException ;; No matching method found: flatMap for class ;; org.apache.spark.sql.Dataset
How can I avoid the error ?
You have to follow the signatures. Java
DatasetAPI provides two implementations ofDataset.flatMap, one which takesscala.Function1and the second one which takes Spark's own
o.a.s.api.java.function.FlatMapFunctionThe former one is rather useless for you, but you should be able to use the latter one. For
RDDAPIflambouses macros to create Spark friendly adapters which can be accessed withflambo.api/fn- I am not sure if these will work directly withDatasets, but you should be able to adjust them if you need.Since you cannot depend on implicit
Encodersyou also have to provide explicit encoder which matches return type.Overall you'll need something around:
where
fimplementsFlatMapFunctionandeis anEncoder. One example implementation:but I guess it is possible to find a better one.
In practice I'd avoid typed
Datasetwhatsoever and focus onDataFrame(Dataset[Row]).