Clojure + Lemur

263 Views Asked by At

I am trying to run some multi step job using lemur+clojure.

I have issue with passing multiple input as argument to clojure+lemur.

As first step for my job I trying to run emr Streaming Job

lemur run ${CONF_DIR}/run-pipeline.clj --master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj --input_folder "${input_folder}" --output-folder "${output_folder}" --reduce-tasks "${REDUCE_TASKS}" --map-tasks "${MAP_TASKS}"

with single input file my code looks like this

(import com.amazonaws.services.elasticmapreduce.util.StepFactory)
(import com.amazonaws.services.elasticmapreduce.model.StepConfig)
(import com.amazonaws.services.elasticmapreduce.util.StreamingStep)


(defn create-step-parsing [eopts]
 (def step (new StreamingStep))
 (.withInputs step (into-array [(str (:input-folder eopts) "/inputs/*")]))
 ...

This works fine, but when I try to pass list of input files I get error

lemur run ${CONF_DIR}/run-pipeline.clj --master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj --input_folder "${input_folder1}" --input_folder "${input_folder2}" --input_folder "${input_folder3}" --input_folder "${input_folder}" --output-folder "${output_folder}" --reduce-tasks "${REDUCE_TASKS}" --map-tasks "${MAP_TASKS}"

(defn create-normalizer-step [eopts]
  (def step (new StreamingStep))
  (.withInputs step (to-array (:input-folder eopts)))

Here is error I am getting

15:44:05 Exception in thread "main" java.lang.ClassCastException
15:44:05    at java.lang.Class.cast(Class.java:2990)
15:44:05    at clojure.lang.Reflector.boxArg(Reflector.java:429)
15:44:05    at clojure.lang.Reflector.boxArgs(Reflector.java:462)
15:44:05    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:57)
15:44:05    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:30)
15:44:05    at run_pipeline17$create_normalizer_step.invoke(run-pipeline.clj:18)
15:44:05    at run_pipeline17$run_pipeline.invoke(run-pipeline.clj:209)
15:44:05    at lemur.core$fire_BANG_.doInvoke(core.clj:711)
15:44:05    at clojure.lang.RestFn.invoke(RestFn.java:423)
15:44:05    at run_pipeline17$eval178.invoke(run-pipeline.clj:222)
15:44:05    at clojure.lang.Compiler.eval(Compiler.java:6465)
15:44:05    at clojure.lang.Compiler.load(Compiler.java:6902)
15:44:05    at clojure.lang.Compiler.loadFile(Compiler.java:6863)
15:44:05    at clojure.lang.RT$3.invoke(RT.java:305)
15:44:05    at lemur.core$execute_jobdef.invoke(core.clj:742)
15:44:05    at lemur.core$_main$fn__1388.invoke(core.clj:929)
15:44:05    at lemur.core$_main.doInvoke(core.clj:924)
15:44:05    at clojure.lang.RestFn.applyTo(RestFn.java:137)
15:44:05    at lemur.core.main(Unknown Source)

The code I added is from line 17 to line 19.

Thanks

1

There are 1 best solutions below

0
On

I believe the issue is with the array functions you're using. In the first working example you are using (into-array).

From the docs:

(into-array aseq)

(into-array type aseq)

Returns an array with components set to the values in aseq. The array's component type is type if provided, or the type of the first value in aseq if present, or Object. All values in aseq must be compatible with the component type. Class objects for the primitive types can be obtained using, e.g., Integer/TYPE.

Because you were creating strings as arguments to the into-array, clojure properly created a "string" array for you.

In your second, broken example, you use (to-array). From the docs:

(to-array coll)

Returns an array of Objects containing the contents of coll, which can be any Collection. Maps to java.util.Collection.toArray().

In this instance you created an array of "Object"s. This is not type compatible with "String" as per the AWS Java API for StreamingStep.

My recommendation is to use (into-array), with the type specification of String. For example: (into-array String ["hello" "goodby"]). Explicit type specification is nice, in my opinion, for future reading clarify. But as you can see by the docs, (into-array) will guess the typing for you.