Akka Stream TCP + Akka Stream Kafka producer not stopping not publishing messages and not error-ing out

787 Views Asked by At

I have the following stream:

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

It works fine for a while, and I can consume the messages populated on the Kafka topic. But from time to time, apparently at a random interval, there are no more messages published, and this code is not logging any errors (printAndByeBye will print the message passed and terminates the actor system.) After restarting the app, the messages continue to flow.

Any idea on how to know what is going on here?

Edit: I put Kamon on it and I could see the following behavior:

Mailbox Size per Actor

Time in Mailbox per Actor

Processing Time per Actor

It looks like something stopped without informing the stream should stop, but I don't know how to make it explicit and stop the stream.

3

There are 3 best solutions below

0
On BEST ANSWER

The stream was not failing, but the TCP stream went idle as the device publishing data stop sending data after a while without dropping the connection. Instead of using the simpler:

TCP().outgoingConnection(bsAddress, bsPort)

I end up using:

def outgoingConnection(
remoteAddress:  InetSocketAddress,
localAddress:   Option[InetSocketAddress]           = None,
options:        immutable.Traversable[SocketOption] = Nil,
halfClose:      Boolean                             = true,
connectTimeout: Duration                            = Duration.Inf,
idleTimeout:    Duration                            = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???

so

Tcp().outgoingConnection(bsAddress, bsPort)

became

val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
    remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
    connectTimeout = connectTimeout,
    idleTimeout = idleTimeout
  )

by informing idleTimeout, the follow start failing and another flow could be restarted.

3
On

If everything goes silent it could be down to backpressure been applied somewhere. Try and selectively replace your backpressure-aware stages with non-backpressure-aware stages and check if the problem is still there. In your case there are 2 possible sources of backpressure:

1) the TCP connection

You can try and attach an infinite source of ByteString to Kafka, doing something along the lines of:

Source.cycle(() => List(???).iterator)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

2) the Kafka sink

replace it with some logging

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runForeach(println)
.onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

Can you see the problem in only one of the 2 cases? In both? In none?

0
On

I´d suggest creating the flow with supervision attributes to handle possible exceptions in your TCP connection like that:

val flow = 
    Tcp().outgoingConnection("", 12)
          .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
          .map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy {
      case ex: Throwable =>
        println("Error ocurred: " + ex)
        Supervision.Resume
     }

and

Source(IndexedSeq(ByteString.empty))
.via(flow)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

If there is any error with the flow the stream stops. With this configuration you´ll see if the flow has thrown any exception.