NNTP client using akka streams in scala

107 Views Asked by At

I'm trying to implement an NNTP client that streams a list of commands to the server and parsing the results back. I'm facing several problems :

  • the NNTP protocol doesn't have an "unique" delimiter that could be use to frame results. Some commands return multi-line responses. How to handle that with streams ?
  • how to "map" the command issued with the server response and wait the end of server response before sending the next command ? (Throttling is not relevant here)
  • how to stop the stream processing on disconnection ? (Actually, the program never returns)

Here is my current implementation :

import akka.stream._
import akka.stream.scaladsl._

import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
import scala.io.StdIn

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure


object AutomatedClient extends App {
  implicit val system = ActorSystem("NewsClientTest")
  implicit val materializer = ActorMaterializer()

  // MODEL //
  final case class Command(query: String)
  final case class CommandResult(
      resultCode: Int,
      resultStatus: String,
      resultList: Option[List[String]])
  final case class ParseException(message: String) extends RuntimeException

  // COMMAND HANDLING FUN //
  // out ->
  val sendCommand: Command => ByteString = c => ByteString(c.query + "\r\n")
  // in <-
  val parseCommandResultStatus: String => (Int, String) = s => 
    (s.take(3).toInt, s.drop(3).trim)
  val parseCommandResultList: List[String] => List[String] = l =>
    l.foldLeft(List().asInstanceOf[List[String]]){
      case (acc, ".") => acc
      case (acc, e) => e.trim :: acc
    }.reverse
  val parseCommandResult: ByteString => Future[CommandResult] = b => Future {
    val resultLines = b.decodeString("UTF-8").split("\r\n")
    resultLines.length match {
      case 0 => throw new ParseException("empty result")
      case 1 => 
        val (code, text) = parseCommandResultStatus(resultLines.head)
        new CommandResult(code, text, None)
      case _ =>
        val (code, text) = parseCommandResultStatus(resultLines.head)
        new CommandResult(code, text, Some(parseCommandResultList(resultLines.tail.toList)))
    }
  }

  // STREAMS //
  // Flows
  val outgoing: Flow[Command, ByteString, NotUsed] = Flow fromFunction sendCommand
  val incoming: Flow[ByteString, Future[CommandResult], NotUsed] = Flow fromFunction parseCommandResult
  val protocol = BidiFlow.fromFlows(incoming, outgoing)
  // Sink
  val print: Sink[Future[CommandResult], _] = Sink.foreach(f => 
    f.onComplete { 
      case Success(r) => println(r)
      case Failure(r) => println("error decoding command result")
    })
  // Source
  val testSource: Source[Command, NotUsed] = Source(List(
          new Command("help"),
          new Command("list"),
      new Command("quit")
  ))

  val (host, port) = ("localhost", 1119)
  Tcp()
    .outgoingConnection(host, port)
    .join(protocol)
    .runWith(testSource, print)

}

And here is the result output :

CommandResult(200,news.localhost NNRP Service Ready - newsmaster@localhost (posting ok),None)
CommandResult(100,Legal Commands,Some(List(article [<messageid>|number], authinfo type value, body [<messageid>|number], date, group newsgroup, head [<messageid>|number], help, last, list [active wildmat|active.times|counts wildmat], list [overview.fmt|newsgroups wildmat], listgroup newsgroup, mode reader, next, post, stat [<messageid>|number], xhdr field [range], xover [range], xpat field range pattern, xfeature useragent <client identifier>, xfeature compress gzip [terminator], xzver [range], xzhdr field [range], quit, 480 Authentication Required*, 205 Goodbye)))

We can see that the second CommandResult contains the result of "list" command and "quit" command.

0

There are 0 best solutions below