How to create a play.api.libs.iteratee.Enumerator which inserts some data between the items of a given Enumerator?

563 Views Asked by At

I use Play framework with ReactiveMongo. Most of ReactiveMongo APIs are based on the Play Enumerator. As long as I fetch some data from MongoDB and return it "as-is" asynchronously, everything is fine. Also the transformation of the data, like converting BSON to String, using Enumerator.map is obvious.

But today I faced a problem which at the bottom line narrowed to the following code. I wasted half of the day trying to create an Enumerator which would consume items from the given Enumerator and insert some items between them. It is important not to load all the items at once, as there could be many of them (the code example has only two items "1" and "2"). But semantically it is similar to mkString of the collections. I am sure it can be done very easily, but the best I could come with - was this code. Very similar code creating an Enumerator using Concurrent.broadcast serves me well for WebSockets. But here even that does not work. The HTTP response never comes back. When I look at Enumeratee, it looks that it is supposed to provide such functionality, but I could not find the way to do the trick.

P.S. Tried to call chan.eofAndEnd in Iteratee.mapDone, and chunked(enums >>> Enumerator.eof instead of chunked(enums) - did not help. Sometimes the response comes back, but does not contain the correct data. What do I miss?

def trans(in:Enumerator[String]):Enumerator[String] = {
  val (res, chan) = Concurrent.broadcast[String]

  val iter = Iteratee.fold(true) { (isFirst, curr:String) =>
    if (!isFirst)
      chan.push("<-------->")
    chan.push(curr)
    false
  }

  in.apply(iter)

  res
}

def enums:Enumerator[String] = {
  val en12 = Enumerator[String]("1", "2")

  trans(en12)
  //en12 //if I comment the previous line and uncomment this, it prints "12" as expected
}

def enum = Action {
  Ok.chunked(enums)
}
2

There are 2 best solutions below

1
On

You wrote that you are working with WebSockets, so why don't you use dedicated solution for that? What you wrote is better for Server-Sent-Events rather than WS. As I understood you, you want to filter your results before sending them back to client? If its correct then you Enumeratee instead of Enumerator. Enumeratee is transformation from-to. This is very good piece of code how to use Enumeratee. May be is not directly about what you need but I found there inspiration for my project. Maybe when you analyze given code you would find best solution.

1
On

Here is my solution which I believe to be correct for this type of problem. Comments are welcome:

def fill[From](
    prefix: From => Enumerator[From],
    infix: (From, From) => Enumerator[From],
    suffix: From => Enumerator[From]
    )(implicit ec:ExecutionContext) = new Enumeratee[From, From] {
  override def applyOn[A](inner: Iteratee[From, A]): Iteratee[From, Iteratee[From, A]] = {
    //type of the state we will use for fold
    case class State(prev:Option[From], it:Iteratee[From, A])

    Iteratee.foldM(State(None, inner)) { (prevState, newItem:From) =>
      val toInsert = prevState.prev match {
        case None => prefix(newItem)
        case Some(prevItem) => infix (prevItem, newItem)
      }

      for(newIt <- toInsert >>> Enumerator(newItem) |>> prevState.it)
        yield State(Some(newItem), newIt)

    } mapM {
      case State(None, it) => //this is possible when our input was empty
        Future.successful(it)
      case State(Some(lastItem), it) =>
        suffix(lastItem) |>> it
    }
  }
}

// if there are missing integers between from and to, fill that gap with 0
def fillGap(from:Int, to:Int)(implicit ec:ExecutionContext) = Enumerator enumerate List.fill(to-from-1)(0)
def fillFrom(x:Int)(input:Int)(implicit ec:ExecutionContext) = fillGap(x, input)
def fillTo(x:Int)(input:Int)(implicit ec:ExecutionContext) = fillGap(input, x)

val ints = Enumerator(10, 12, 15)
val toStr = Enumeratee.map[Int] (_.toString)

val infill = fill(
  fillFrom(5),
  fillGap,
  fillTo(20)
)

val res = ints &> infill &> toStr // res will have 0,0,0,0,10,0,12,0,0,15,0,0,0,0