I'm trying to implement an NNTP client that streams a list of commands to the server and parsing the results back. I'm facing several problems :
- the NNTP protocol doesn't have an "unique" delimiter that could be use to frame results. Some commands return multi-line responses. How to handle that with streams ?
- how to "map" the command issued with the server response and wait the end of server response before sending the next command ? (Throttling is not relevant here)
- how to stop the stream processing on disconnection ? (Actually, the program never returns)
Here is my current implementation :
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
import scala.io.StdIn
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure
object AutomatedClient extends App {
implicit val system = ActorSystem("NewsClientTest")
implicit val materializer = ActorMaterializer()
// MODEL //
final case class Command(query: String)
final case class CommandResult(
resultCode: Int,
resultStatus: String,
resultList: Option[List[String]])
final case class ParseException(message: String) extends RuntimeException
// COMMAND HANDLING FUN //
// out ->
val sendCommand: Command => ByteString = c => ByteString(c.query + "\r\n")
// in <-
val parseCommandResultStatus: String => (Int, String) = s =>
(s.take(3).toInt, s.drop(3).trim)
val parseCommandResultList: List[String] => List[String] = l =>
l.foldLeft(List().asInstanceOf[List[String]]){
case (acc, ".") => acc
case (acc, e) => e.trim :: acc
}.reverse
val parseCommandResult: ByteString => Future[CommandResult] = b => Future {
val resultLines = b.decodeString("UTF-8").split("\r\n")
resultLines.length match {
case 0 => throw new ParseException("empty result")
case 1 =>
val (code, text) = parseCommandResultStatus(resultLines.head)
new CommandResult(code, text, None)
case _ =>
val (code, text) = parseCommandResultStatus(resultLines.head)
new CommandResult(code, text, Some(parseCommandResultList(resultLines.tail.toList)))
}
}
// STREAMS //
// Flows
val outgoing: Flow[Command, ByteString, NotUsed] = Flow fromFunction sendCommand
val incoming: Flow[ByteString, Future[CommandResult], NotUsed] = Flow fromFunction parseCommandResult
val protocol = BidiFlow.fromFlows(incoming, outgoing)
// Sink
val print: Sink[Future[CommandResult], _] = Sink.foreach(f =>
f.onComplete {
case Success(r) => println(r)
case Failure(r) => println("error decoding command result")
})
// Source
val testSource: Source[Command, NotUsed] = Source(List(
new Command("help"),
new Command("list"),
new Command("quit")
))
val (host, port) = ("localhost", 1119)
Tcp()
.outgoingConnection(host, port)
.join(protocol)
.runWith(testSource, print)
}
And here is the result output :
CommandResult(200,news.localhost NNRP Service Ready - newsmaster@localhost (posting ok),None)
CommandResult(100,Legal Commands,Some(List(article [<messageid>|number], authinfo type value, body [<messageid>|number], date, group newsgroup, head [<messageid>|number], help, last, list [active wildmat|active.times|counts wildmat], list [overview.fmt|newsgroups wildmat], listgroup newsgroup, mode reader, next, post, stat [<messageid>|number], xhdr field [range], xover [range], xpat field range pattern, xfeature useragent <client identifier>, xfeature compress gzip [terminator], xzver [range], xzhdr field [range], quit, 480 Authentication Required*, 205 Goodbye)))
We can see that the second CommandResult contains the result of "list" command and "quit" command.