Apache spark job failed immediately without retry, setting maxFailures doesn't work

7.4k Views Asked by At

I was testing a web crawling/scrapping program on Apache Spark locally on my computer.

the program use a few RDD transformations that takes a volatile function that sporadically fails. (The function's purpose is to transform URL links into web pages, sometimes the headless browser it invoked just blackout or got overloaded - I can't avoid that)

I heard that Apache Spark has powerful failover and retrying feature, any unsuccessful transformation or lost data can be recalculated from scratch from whatever resource it can find (sounds like magic right?) so I didn't put any failover or try-catch in my code.

This is my spark configuration:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

Unfortunately the job failed after the majority of stages and individual tasks succeeded. The latest log in console shows:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

Looks like Spark just give up cowardly after failed once. How do I configure it properly to make it more tenacious?

(my program can be downloaded from https://github.com/tribbloid/spookystuff, sorry for the scarce and disorganized code/documentation, I just start it for a few days)

ADD: if you want to try it yourself, The following code can demonstrate this problem:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

It should be noted that the same IllegalStateException got retried for 32 times in this post: Apache Spark Throws java.lang.IllegalStateException: unread block data

3

There are 3 best solutions below

2
On BEST ANSWER

Let me forward the most authoritative answer:

If this is a useful feature for local mode, we should open a JIRA to document the setting or improve it (I’d prefer to add a spark.local.retries property instead of a special URL format). We initially disabled it for everything except unit tests because 90% of the time an exception in local mode means a problem in the application, and we’d rather let the user debug that right away rather than retrying the task several times and having them worry about why they get so many errors.

Matei

0
On

I know it's a very old question, but I had exactly same problem and came across this question while looking for a solution.

There are 3 master URL formats to submit a spark application in a local mode:

  • local - one thread (no parallelism), no retries
  • local[K] (or local[*]) - uses K (or number of cores) worker threads and sets task.maxFailures to 1 (see here)

  • local[K, F] (or local[*, F]) - sets the task.maxFailures=F, and this is what we were after.

Consult Spark documentation for details.

0
On

This works for me -

sparkConfig
.set("spark.task.maxFailures", "2")
.set("spark.master", "local[2, 2]")

I had to set both in order to see my task failing (while throwing an exception) and then reattempting in local test environment.