Apache Spark - Exception/Error handling and Exception/Error propagation

49 Views Asked by At

I have some Java code which runs on Spark which I kind of maintain.

The code consumes data from Kafka and writes to a few data stores.
So we call these small Spark Java programs consumers.

Environment where this code runs:

  • Amazon EMR version ( emr-5.30.0 )
  • Installed applications ( spark 2.4.5 )

I've noticed that exceptions which occur on the worker/executor nodes are sometimes not propagated to the main driver program.

Is there any good reference where I can read up about Java exception handling and Java exception propagation in code running on Spark? How does it work in principle? What are the general guidelines, etc.? Is the behavior (propagated / not propagated) different depending on the type of the exception (RuntimeException vs. non-RuntimeException), etc.?

I went through multiples posts and articles on the web,
but I am unable to find almost any decent in-depth information.
The closest question to my question seems to be this one.

Apache Spark - Exception handling inside foreachRDD

But there's no answer there.

Also, when looking at our Java code I find it hard to figure out which code runs on the workers and which one - not (i.e. which one is part just of the driver program). Any advice about that?

Is it true e.g. that code passed in to javaInputDStream.foreachRDD runs on the workers/executors while the rest is part of the driver program?

The exception which is not propagated to the driver program seems to be in a function which is passed in to javaRDD.mapPartitions. This call to mapPartitions is inside of foreachRDD. Is that lack of exception propagation normal/expected?

Any hints or ideas are welcome.
Many thanks in advance.

Unfortunately I cannot really post code here in this question. Sorry about that.

1

There are 1 best solutions below

2
DMH On

I would expect that any exception on an executor that isn't caught and/or results in the executor process returning with nonzero status would get propagated to the driver. But if you catch an exception and try to recover from it, that might not. If you're catching an exception on the executor and think it should be reflected on the driver, try throwing a RuntimeException in your catch block and see what happens.

Regarding which code runs on executor vs. driver, in general I think any code that happens inside a Spark API method you should expect to run on an executor, while anything that isn't (e.g. parsing runtime arguments or reading a config file passed in at startup) will run on the driver only. In your case, the code "inside" foreachRDD should be running on the executors only.