I am trying to run some multi step job using lemur+clojure.
I have issue with passing multiple input as argument to clojure+lemur.
As first step for my job I trying to run emr Streaming Job
lemur run ${CONF_DIR}/run-pipeline.clj --master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj --input_folder "${input_folder}" --output-folder "${output_folder}" --reduce-tasks "${REDUCE_TASKS}" --map-tasks "${MAP_TASKS}"
with single input file my code looks like this
(import com.amazonaws.services.elasticmapreduce.util.StepFactory)
(import com.amazonaws.services.elasticmapreduce.model.StepConfig)
(import com.amazonaws.services.elasticmapreduce.util.StreamingStep)
(defn create-step-parsing [eopts]
(def step (new StreamingStep))
(.withInputs step (into-array [(str (:input-folder eopts) "/inputs/*")]))
...
This works fine, but when I try to pass list of input files I get error
lemur run ${CONF_DIR}/run-pipeline.clj --master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj --input_folder "${input_folder1}" --input_folder "${input_folder2}" --input_folder "${input_folder3}" --input_folder "${input_folder}" --output-folder "${output_folder}" --reduce-tasks "${REDUCE_TASKS}" --map-tasks "${MAP_TASKS}"
(defn create-normalizer-step [eopts]
(def step (new StreamingStep))
(.withInputs step (to-array (:input-folder eopts)))
Here is error I am getting
15:44:05 Exception in thread "main" java.lang.ClassCastException
15:44:05 at java.lang.Class.cast(Class.java:2990)
15:44:05 at clojure.lang.Reflector.boxArg(Reflector.java:429)
15:44:05 at clojure.lang.Reflector.boxArgs(Reflector.java:462)
15:44:05 at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:57)
15:44:05 at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:30)
15:44:05 at run_pipeline17$create_normalizer_step.invoke(run-pipeline.clj:18)
15:44:05 at run_pipeline17$run_pipeline.invoke(run-pipeline.clj:209)
15:44:05 at lemur.core$fire_BANG_.doInvoke(core.clj:711)
15:44:05 at clojure.lang.RestFn.invoke(RestFn.java:423)
15:44:05 at run_pipeline17$eval178.invoke(run-pipeline.clj:222)
15:44:05 at clojure.lang.Compiler.eval(Compiler.java:6465)
15:44:05 at clojure.lang.Compiler.load(Compiler.java:6902)
15:44:05 at clojure.lang.Compiler.loadFile(Compiler.java:6863)
15:44:05 at clojure.lang.RT$3.invoke(RT.java:305)
15:44:05 at lemur.core$execute_jobdef.invoke(core.clj:742)
15:44:05 at lemur.core$_main$fn__1388.invoke(core.clj:929)
15:44:05 at lemur.core$_main.doInvoke(core.clj:924)
15:44:05 at clojure.lang.RestFn.applyTo(RestFn.java:137)
15:44:05 at lemur.core.main(Unknown Source)
The code I added is from line 17 to line 19.
Thanks
I believe the issue is with the array functions you're using. In the first working example you are using (into-array).
From the docs:
Because you were creating strings as arguments to the into-array, clojure properly created a "string" array for you.
In your second, broken example, you use (to-array). From the docs:
In this instance you created an array of "Object"s. This is not type compatible with "String" as per the AWS Java API for StreamingStep.
My recommendation is to use (into-array), with the type specification of String. For example: (into-array String ["hello" "goodby"]). Explicit type specification is nice, in my opinion, for future reading clarify. But as you can see by the docs, (into-array) will guess the typing for you.