I am trying to understand what's the correct way to handle errors using core.async/pipeline, my pipeline is the following:
input --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out
Where xf-run-computation
will do an http calls and return response. However some of these responses will return an error. What's the best way to handles these errors?
My solution is to split the outputs channels in success-values
and error-values
and then merge them back to a channel:
(let [[success-values1 error-values1] (split fn-to-split first-out)
[success-values2 error-values2] (split fn-to-split last-out)
errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out xf-run-computation success-values1)
[last-out errors])
So my function will return the last results and the errors.
Generally speaking, what is "the" correct way is probably depending on your application needs, but given your problem description, I think there are three things you need to consider:
xf-run-computation
returns data that your business logic would see as errors,xf-run-computation
throws an exception andxf-run-computation
might never finish (or not finish in time).Regarding point 3., the first thing you should consider is using
pipeline-blocking
instead ofpipeline
.I think your question is mostly related to point 1. The basic idea is that the result of
xf-run-computation
needs to return a data structure (say a map or a record), which clearly marks a result as an error or a success, e.g.{:title nil :body nil :status "error"}
. This will give you some options of dealing with the situation:all your later code simply ignores input data which has
:status "error"
. I.e., yourxf-run-computation
would contain a line like(when (not (= (:status input) "error")) (run-computation input))
,you could run a filter on all results between the
pipeline
-calls andfilter
them as needed (note thatfilter
can also be used as a transducer in a pipeline, thereby obliterating the oldfilter>
andfilter<
functions of core.async),you use
async/split
like you suggested / Alan Thompson shows in his answer to to filter out the error values to a separate error channel. There is no real need to have a second error channel for your second pipeline if you're going to merge the values anyway, you can simply re-use your error channel.For point 2., the problem is that any exception in
xf-run-computation
is happening in another thread and will not simply propagate back to your calling code. But you can make use of theex-handler
argument topipeline
(andpipeline-blocking
). You could either simply filter out all exceptions, put the result on a separate exception channel or try to catch them and turn them into errors (potentially putting them back on the result or another error channel) -- the latter only makes sense, if the exception gives you enough information, e.g. an id or something that allows to tie back the exception to the input which caused the exception. You could arrange for this inxf-run-computation
(i.e.catch
any exception thrown from a third-party library like the http call).For point 3, the canonical answer in core.async would be to point to a
timeout
channel, but this doesn't make much sense in relation topipeline
. A better idea is to ensure on your http calls that a timeout is set, e.g. the:timeout
option of http-kit or:socket-timeout
and:conn-timeout
of clj-http. Note that these options will usually result in an exception on timeout.