Idiomatic way to handle multiple concurrent streams in Scala

369 Views Asked by At

I have a list of streams that, upon calling their next() will sleep random amount of time and then read one char from a different source.

I am trying to write a consumer(s) that will keep calling these streams until EOF and build a common dictionary of these streams at runtime.

So far, I am using a ConcurrentHashMap for the dictionary and simply creating a new thread for each of the stream consumers.

while my solution works, it seems very naiive and I wonder if there's a better use for a streaming library such as monix or fs2

1

There are 1 best solutions below

3
On BEST ANSWER

Based on the description of the question, and subsequent comments, I am assuming there exists multiple Iterator[Char] sources:

val allSources : Iterable[Iterator[Char]] = ???

And the question is: how to concurrently collect String values from these Iterators to form a mapping of String to count.

Stream Based Solution

First we need to convert each of the Iterators to an Iterator of String values based on a separator:

trait Word {
  val data : String
}

object EmptyWord extends Word {
  override val data = ""
}

case class PartialWord(val data : String) extends Word

case class WholeWord(val data : String) extends Word

val appendToWord : Char => (Word, Char) => Word = 
  (separator) => (originalWord, appendChar) => originalWord match {
    case PartialWord(d) => 
      if(appendChar == separator)
        WholeWord(d)
      else
        PartialWord(d + appendChar)
    case _ => PartialWord(appendChar.toString)
  }

val isWholeWord : Word => Boolean = (_ : Word) match {
  case _ : WholeWord => true
  case _             => false
}

//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] = 
  (_ : Iterator[Char])
    .scanLeft(EmptyWord)(appendToWord(' '))
    .filter(isWholeWord)
    .map(_.data)

We can now convert all of the Iterators to generate Strings and we can combine all of the Iterators into a single Iterator:

val allWordSource : Iterator[String] = 
  allSources.map(convertCharIterator)
            .reduceOption( _ ++ _)
            .getOrElse(Iterator.empty[String])

This Iterator can now be the source of an akka stream which will calculate your count:

val addToCounter : (Map[String, Int], String) => Map[String, Int] = 
  (counter, word) => 
    counter.updated(word, counter.getOrElse(word, 0) + 1)

val counter : Future[Map[String, Int]] = 
  Source
    .fromIterator( () => allWordSource)
    .runFold(Map.empty[String, Int])(addToCounter)