Handling Out-of-Memory Errors in Spark on Kubernetes: Need Assistance in Stopping SparkContext

51 Views Asked by At

I'm currently working on a Spark application deployed on Kubernetes, where I have multiple executor instances and one driver instance. I've encountered an issue where one of the executor instances gets stuck due to an Out-of-Memory (OOM) error. To address this, I'm attempting to capture the OOM error and subsequently stop the entire SparkContext.

Here's the code snippet I'm using to capture the OOM error and stop the Spark Context:

@Override
 public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
     String reason = executorRemoved.reason();
     LOG.info("onExecutorRemoved reason:{}", reason);
     // reason prints below log
     //The executor with id 1 exited with exit code 137(SIGKILL, possible container OOM).
     if (reason != null && reason.contains("exit code 137")
                       && reason.contains("possible container OOM")) {
             SparkContext sparkContext = sparkSession.sparkContext();
             sparkContext.stop();
         }
     }
 }

The error I am getting :

org.apache.spark.SparkException: Cannot stop SparkContext within listener bus thread.
    at org.apache.spark.SparkContext.stop(SparkContext.scala:2065)
    at com.clearwateranalytics.washsales.v2.SparkEventListener.onExecutorRemoved(SparkEventListener.java:55)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:65)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1447)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

Question #1: What is the optimal method for detecting and handling Out-of-Memory (OOM) errors when encountered by any Spark executor?

Question #2: Given my current approach, where would be the appropriate location to stop the SparkContext, considering that it is not permissible within a listener?

Question #3: I am open to any other suggestions regarding my use case.

added in description

0

There are 0 best solutions below