I am trying to rewrite Spark Structured Streaming example in Clojure.
The example is written in Scala as follows:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
(ns flambo-example.streaming-example
(:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
[org.apache.spark.sql.functions]
))
(def spark
(->
(SparkSession/builder)
(.appName "sample")
(.master "local[*]")
.getOrCreate)
)
(def lines
(-> spark
.readStream
(.format "socket")
(.option "host" "localhost")
(.option "port" 9999)
.load
)
)
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap #(clojure.string/split % #" " ))
))
The above code causes the following exception.
;; Caused by java.lang.IllegalArgumentException ;; No matching method found: flatMap for class ;; org.apache.spark.sql.Dataset
How can I avoid the error ?
You have to follow the signatures. Java
Dataset
API provides two implementations ofDataset.flatMap
, one which takesscala.Function1
and the second one which takes Spark's own
o.a.s.api.java.function.FlatMapFunction
The former one is rather useless for you, but you should be able to use the latter one. For
RDD
APIflambo
uses macros to create Spark friendly adapters which can be accessed withflambo.api/fn
- I am not sure if these will work directly withDatasets
, but you should be able to adjust them if you need.Since you cannot depend on implicit
Encoders
you also have to provide explicit encoder which matches return type.Overall you'll need something around:
where
f
implementsFlatMapFunction
ande
is anEncoder
. One example implementation:but I guess it is possible to find a better one.
In practice I'd avoid typed
Dataset
whatsoever and focus onDataFrame
(Dataset[Row]
).