Processing jobs from capped collection until interrupted using ReactiveMongo

220 Views Asked by At

I have a jobs_queue collection in MongoDB. It's a capped collection which I'm polling using a tailable cursor:

val cur =
  jobsQueue
    .find(Json.obj("done" -> Json.obj("$ne" -> true)))
    .options(QueryOpts().tailable.awaitData)
    .cursor[JsObject]

cur.enumerate() |>>> Iteratee.foreach { queuedDoc =>
  // do some processing and store the results back in the DB
}

This is being called from a regular Scala App, so there's no Akka or Play wrapping at all.

What would be the most appropriate way to make sure the App doesn't exit until I explicitly break out of the Iteratee.foreach? Also, I don't have to use play-iteratees at all if there's a simpler (even if slightly less elegant) way.


P.S. I do ensure the collection is capped:

val jobsQueueMaybe = db.collection[JSONCollection]("jobs_queue")
val jobsQueue: JSONCollection =
  jobsQueueMaybe.stats()
    .flatMap {
      case stats if !stats.capped =>
        jobsQueueMaybe.convertToCapped(size = 1024 * 1024, maxDocuments = None)
      case _ =>
        Future(jobsQueueMaybe)
    }
    .recover { case _ => jobsQueueMaybe.createCapped(size = 1024 * 1024, maxDocuments = None) }
    .map { _ => jobsQueueMaybe }

P.P.S.

I will also appreciate any criticism as to how I've designed this bit of logic, and how I could solve this by rethinking my approach and slightly overhauling the implementation.

1

There are 1 best solutions below

0
On

As the current workaround, I changed from Iteratee.foreach to Iteratee.foldM so that each iteration returns a Future; this way it seems that it forces ReactiveMongo to continue the computation until interrupted, as opposed to foreach that just seems to exit too early:

cur.enumerate() |>>> Iteratee.foldM(()) { (acc, queuedDoc) =>
  // always yield something like Future.successful(acc) or an actual `Future[Unit]`
}

then, I just need to wait until the entire program is terminated (which is signalled by something being put into a stopSignal: ConcurrentLinkedQueue:

while (stopSignal.isEmpty) Thread.sleep(1000)

But, while it works well, I'm not particularly in love with that solution.

Maybe my fears are unjustified, but I'd really like a somewhat more authoritative answer as to how I should be solving this.