How to use environment variables in spark which deployed on cluster mode?

1.5k Views Asked by At

When I set environment variable using Intellij below code works, but when i deploy code with spark-submit it does not work since environment variables are not exits on entire cluster.

import com.hepsiburada.util.KafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, date_format, to_date}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.{Minutes, StreamingContext}

object StreamingApplication {

  private val appName: String = sys.env("APPNAME")
  private val bootstrapServers: String = sys.env("BOOTSTRAPSERVERS")
  private val topic: String = sys.env("TOPIC")
  private val consumerGroupId: String = sys.env("CONSUMERGROUPID")
  private val batchIntervalMinutes = sys.env("BATCHINTERVALMIN").toLong
  private val outputPathHdfs: String = sys.env("OUTPUTPATHHDFS")

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .set("spark.app.name", appName)
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
      .set("spark.sql.legacy.timeParserPolicy", "LEGACY")
      .set("spark.sql.session.timeZone", "UTC")

    val streamingContext = new StreamingContext(conf, Minutes(batchIntervalMinutes))
    streamingContext.sparkContext.setLogLevel("WARN")

    val kafkaStream = KafkaUtil.getKafkaStream(streamingContext, topic, bootstrapServers, consumerGroupId)

    val spark = SparkSession.builder.config(streamingContext.sparkContext.getConf).getOrCreate()

    kafkaStream.foreachRDD { rdd =>
      val rdd_value = rdd.map(_.value().toString)
      if (!rdd_value.isEmpty()) {
        val df_init = spark.read.json(rdd_value)
        val df = df_init.withColumn("dy", to_date(col("receivedat"))).withColumn("hr", date_format(col("receivedat"), "HH"))
        df.coalesce(1)
          .write.mode(SaveMode.Append)
          .format("json")
          .partitionBy("dy", "hr")
          .save(outputPathHdfs)
      }
    }
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

What i have tried so far:

spark.yarn.appMasterEnv, spark.yarn.appExecutorEnv configuration to pass environment variables to the cluster but that does not works.

I also tried to set variables as conf like --conf spark.Variable=x and spark.get("Variable") instead of sys.env("Variable") but that does not worked as well

Here is the error:

22/04/17 17:08:40 ERROR ApplicationMaster: User class threw exception: java.util.NoSuchElementException: APPNAME
java.util.NoSuchElementException: APPNAME
    at org.apache.spark.SparkConf.$anonfun$get$1(SparkConf.scala:245)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.SparkConf.get(SparkConf.scala:245)
    at com.hepsiburada.StreamingApplication$.main(StreamingApplication.scala:18)
    at com.hepsiburada.StreamingApplication.main(StreamingApplication.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
0

There are 0 best solutions below