How to use akka dataflow in maven

175 Views Asked by At

I encountered a problem when using data flow . The maven can not compile if I enable the continuation in maven scala plugin config.

After I enable the continuation in maven, the compile starts to fail. And the error message is very confusing.

Like this:

found : Unit @scala.util.continuations.cpsSynth @util.continuations.cps[scala.concurrent.Future[Any]]

required: Unit

Or this:

found : ((Class[_ <: Indexed[String]], scala.concurrent.Promise[List[_ <: Indexed[String]]])) => (Class[$1(in value $anonfun)], List[ <: Indexed[String]]) @scala.util.continuations.cpsSynth @util.continuations.cps[scala.concurrent.Future[Any]] forSome { type _$1(in value $anonfun) <: Indexed[String] }

required: ((Class[_ <: Indexed[String]], scala.concurrent.Promise[List[_ <: Indexed[String]]])) => B

All these error occurs at the closure . It seems that the closure can not be used in flow context.

Any one knows why ?

Error code:

def toList[T <: Indexed[String]](content: Iterable[String], klass: Class[T]) = {
    val ref = new TypeReference[List[T]] {}
    content.flatMap(JsonUtils.toObject(_, ref)).toList
}

flow {
    val objects = toList(content, klass)
    val hasRefs = klass.getFields.filter(it => it.getAnnotation(classOf[Ref]) != null)
    hasRefs.foreach {injectRef(_, objects)}
    promise << objects
}

def injectRef[T <: Indexed[String]](field: Field, objects: List[T]) {
    val anno = field.getAnnotation(classOf[Ref])
    val refKlass = anno.klass()

    def setField(o: T) {
        val original = field.isAccessible
        field.setAccessible(true)
        val f = field.get(o)
        val found = promises.get(refKlass)().find(it => it.index() == f)
        field.set(o, found)
        field.setAccessible(original)
    }
    objects.foreach { o =>
        setField(o) // here, compile error
    }
}
0

There are 0 best solutions below