When I set environment variable using Intellij below code works, but when i deploy code with spark-submit it does not work since environment variables are not exits on entire cluster.
import com.hepsiburada.util.KafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, date_format, to_date}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.{Minutes, StreamingContext}
object StreamingApplication {
private val appName: String = sys.env("APPNAME")
private val bootstrapServers: String = sys.env("BOOTSTRAPSERVERS")
private val topic: String = sys.env("TOPIC")
private val consumerGroupId: String = sys.env("CONSUMERGROUPID")
private val batchIntervalMinutes = sys.env("BATCHINTERVALMIN").toLong
private val outputPathHdfs: String = sys.env("OUTPUTPATHHDFS")
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.set("spark.app.name", appName)
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
.set("spark.sql.session.timeZone", "UTC")
val streamingContext = new StreamingContext(conf, Minutes(batchIntervalMinutes))
streamingContext.sparkContext.setLogLevel("WARN")
val kafkaStream = KafkaUtil.getKafkaStream(streamingContext, topic, bootstrapServers, consumerGroupId)
val spark = SparkSession.builder.config(streamingContext.sparkContext.getConf).getOrCreate()
kafkaStream.foreachRDD { rdd =>
val rdd_value = rdd.map(_.value().toString)
if (!rdd_value.isEmpty()) {
val df_init = spark.read.json(rdd_value)
val df = df_init.withColumn("dy", to_date(col("receivedat"))).withColumn("hr", date_format(col("receivedat"), "HH"))
df.coalesce(1)
.write.mode(SaveMode.Append)
.format("json")
.partitionBy("dy", "hr")
.save(outputPathHdfs)
}
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
What i have tried so far:
spark.yarn.appMasterEnv, spark.yarn.appExecutorEnv configuration to pass environment variables to the cluster but that does not works.
I also tried to set variables as conf like --conf spark.Variable=x and spark.get("Variable") instead of sys.env("Variable") but that does not worked as well
Here is the error:
22/04/17 17:08:40 ERROR ApplicationMaster: User class threw exception: java.util.NoSuchElementException: APPNAME
java.util.NoSuchElementException: APPNAME
at org.apache.spark.SparkConf.$anonfun$get$1(SparkConf.scala:245)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.SparkConf.get(SparkConf.scala:245)
at com.hepsiburada.StreamingApplication$.main(StreamingApplication.scala:18)
at com.hepsiburada.StreamingApplication.main(StreamingApplication.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)