Kafka streams.allMetadata() method returns empty list

2.7k Views Asked by At

So I am trying to get interactive queries working with Kafka streams. I have Zookeeper and Kafka running locally (on windows). Where I use the C:\temp as the storage folder, for both Zookeeper and Kafka.

I have setup the topic like this

kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-output-topic

Reading I have Done around this Issue

I have read this documentation page : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

I have also read the Java example here : https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

And also read this similar post which initially sounded like the same issue as me : Cannot access KTable from a different app as StateStore

So that is my setup. So what's the issue?

So as I say I am trying to create my own app, which allows interactive queries using a custom Akka Http REST Api (RPC calls as recommended) to allow me to query my KTable. The actual stream processing seems to be happening as expected, and I am able to print the results of the KTable and they match what is produced on the topic.

So the Storage side of things seems to be working

The problem seems to arise when attempting to use the Streams.allMetadata() method, where it returns an empty list.

I am using

  • List item
  • Scala 2.12
  • SBT
  • Akka.Http 10.9 for the REST Api
  • Kafka 11.0

Producer code

Here is the code for my producer

package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes
  import Utils.Settings
  import org.apache.kafka.clients.producer.ProducerConfig

  object RatingsProducerApp extends App {

   run()

    private def run(): Unit = {

      val jSONSerde = new JSONSerde[Ranking]
      val random = new Random
      val producerProps = Settings.createBasicProducerProperties
      val rankingList = List(
        Ranking("[email protected]","[email protected]", 1.5f),
        Ranking("[email protected]","[email protected]", 1.5f),
        Ranking("[email protected]","[email protected]", 3.5f),
        Ranking("[email protected]","[email protected]", 2.5f),
        Ranking("[email protected]","[email protected]", 1.5f))

      producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

      System.out.println("Connecting to Kafka cluster via bootstrap servers " +
        s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

      // send a random string from List event every 100 milliseconds
      val rankingProducer = new KafkaProducer[String, Array[Byte]](
        producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

      //while (true) {
      for (i <- 0 to 10) {
        val ranking = rankingList(random.nextInt(rankingList.size))
        val rankingBytes = jSONSerde.serializer().serialize("", ranking)
        System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
        rankingProducer.send(new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
        Thread.sleep(100)
      }

      Runtime.getRuntime.addShutdownHook(new Thread(() => {
        rankingProducer.close(10, TimeUnit.SECONDS)
      }))
    }
  }
}

Streams Code

Here is the streams code

def createRatingStreamsProperties() : Properties = {
  val props = createBasicStreamProperties
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application")
  props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client")
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props
}

private def createBasicStreamProperties() : Properties = {
  val props = new Properties()
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  // Records should be flushed every 10 seconds. This is less than the default
  // in order to keep this example interactive.
  props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
  // For illustrative purposes we disable record caches
  props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
  props
}

And the actual code

import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import Utils.Settings

package Processing.Ratings {

import Stores.StateStores
import org.apache.kafka.streams.state.HostInfo


class DummyRankingReducer extends Reducer[Ranking] {
  override def apply(value1: Ranking, value2: Ranking): Ranking = {
    value2
  }
}

class RankingByEmailInitializer extends Initializer[List[Ranking]] {
  override def apply(): List[Ranking] = List[Ranking]()
}

class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
  override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
    value :: aggregate
  }
}


object RatingStreamProcessingApp extends App {

  run()

  private def run() : Unit = {
    val stringSerde = Serdes.String
    val rankingSerde = new JSONSerde[Ranking]
    val listRankingSerde = new JSONSerde[List[Ranking]]
    val builder: KStreamBuilder = new KStreamBuilder
    val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)

    val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
      .aggregate(
        new RankingByEmailInitializer(),
        new RankingByEmailAggregator(),
        listRankingSerde,
        StateStores.RANKINGS_BY_EMAIL_STORE
      )

    rankingTable.toStream.print()

    val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
    val restEndpoint:HostInfo  = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to play around with the example
    // when resetting the application for doing a re-run (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
    // will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
    // is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    //streams.cleanUp();
    streams.start()
    val restService = new RatingRestService(streams, restEndpoint)
    restService.start()


    //****************************************************************
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    //****************************************************************


    val SIZE = streams.allMetadata.size()
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

    val range = keyValueStore.all
    val HASNEXT = range.hasNext
    import org.apache.kafka.streams.KeyValue
    while (range.hasNext      ) {
      val next = range.next
      System.out.println(String.format("key: %s | value: %s", next.key, next.value))
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      streams.close(10, TimeUnit.SECONDS)
      restService.stop
    }))

    //return unit
    ()
  }
}

}

Where I have this config

kafka {
    bootStrapServers = "localhost:9092"
    zooKeepers = "zookeeper:2181"
    schemaRegistryUrl = "http://localhost:8081"
    partition = 0,
    restApiDefaultHostName = "localhost",
    restApiDefaultPort = "8080"
}

REST Service stuff

Scala port of the example file : https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java

package Processing.Ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import Entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


   /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }




  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {

    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }



}

And here is the REST Service (I have only attempted to get "instances" route working at the moment).

package Processing.Ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.AkkaHttpEntitiesJsonFormats._
import Entities._
import akka.http.scaladsl.marshalling.ToResponseMarshallable

import scala.concurrent.Future


object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}


class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r


    val route =
      path("ratingByEmail" / emailRegexPattern) { email =>
        get {

          //TODO : This would come from Kafka store, either local or remote

          complete(ToResponseMarshallable.apply(List[Ranking](
            Ranking("[email protected]", "[email protected]", 4.0f),
            Ranking("[email protected]", "[email protected]", 2.0f)))
          )
        }
      } ~
      path("instances") {
        get {
          val x = metadataService.streamsMetadata
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }


    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

Here is proof that there is data in the store

producer running enter image description here

streams running enter image description here

This is me having run the producer 1st, then the streams, and then the producer again (another run).

See how the results from the KTable are being shown, then I started the producer and pushed some more messages through the topic which the streams picked up

But when I query my REST endpoint to try get the metadata using localhost:8080/instances, all I get it an empty list []

enter image description here

I would have expected these lines from the streams code above to return some metadata, there is clearly something in the store, so why no metadata

val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

Both of these return 0, whilst iterating through the items in the store using this code

import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext      ) {
  val next = range.next
  System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}

Produces data from the store

enter image description here

I know the REST api is working ok, as the hardcoded test route is working fine

enter image description here

What am I doing wrong???

1

There are 1 best solutions below

0
On BEST ANSWER

So I figured this out, turns out is was due to this missing config value

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,  "localhost:8080")

Once I added that the Akka Htpp REST API http://localhost:8080/instance started to work. But then I started getting this weird exception

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)

So after reading about this one here : http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

I decided what I need to do was carry out some retry logic, which I did like this:

Retry

Which I borrowed from here : https://gist.github.com/Mortimerp9/5430595

package Utils

import scala.concurrent._
import scala.concurrent.duration._


object Retry {

  /**
    * exponential back off for retry
    */
  def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds

  def noIgnore(t: Throwable): Boolean = false

  /**
    * retry a particular block that can fail
    *
    * @param maxRetry  how many times to retry before to giveup
    * @param deadline   how long to retry before giving up; default None
    * @param backoff        a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps
    * @param ignoreThrowable        if you want to stop retrying on a particular exception
    * @param block  a block of code to retry
    * @param ctx    an execution context where to execute the block
    * @returns  an eventual Future succeeded with the value computed or failed with one of:
    *   `TooManyRetriesException`   if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters
    *   `DeadlineExceededException` if the retry didn't succeed before the provided deadline
    *   `TimeoutException`  if you provide a deadline and the block takes too long to execute
    *   `Throwable` the last encountered exception
    */
  def retry[T](maxRetry: Int,
               deadline: Option[Deadline] = None,
               backoff: (Int) => Duration = exponentialBackoff,
               ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {

    class TooManyRetriesException extends Exception("too many retries without exception")
    class DeadlineExceededException extends Exception("deadline exceded")

    val p = Promise[T]

    def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {
      if (maxRetry == retryCnt
        || deadline.isDefined && deadline.get.isOverdue) {
        exception match {
          case Some(t) =>
            p failure t
          case None if deadline.isDefined && deadline.get.isOverdue =>
            p failure (new DeadlineExceededException)
          case None =>
            p failure (new TooManyRetriesException)
        }
        None
      } else {
        val success = try {
          val rez = if (deadline.isDefined) {
            Await.result(future(f()), deadline.get.timeLeft)
          } else {
            f()
          }
          Some(rez)
        } catch {
          case t: Throwable if !ignoreThrowable(t) =>
            blocking {
              val interval = backoff(retryCnt).toMillis
              Thread.sleep(interval)
            }
            recursiveRetry(retryCnt + 1, Some(t))(f)
          case t: Throwable =>
            p failure t
            None
        }
        success match {
          case Some(v) =>
            p success v
            Some(v)
          case None => None
        }
      }
    }

    def doBlock() = block

    Future {
      recursiveRetry(0, None)(doBlock)
    }

    p.future
  }

}

Which I call like this

def printStoreMetaData(streams:KafkaStreams) : Unit = {

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore

    val keyValueStoreTry = waitUntilStoreIsQueryable(
      StateStores.RANKINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
      streams
    ) match {
      case Success(keyValueStore) => {
        val SIZE = streams.allMetadata.size()
        val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
        val range = keyValueStore.all
        val HASNEXT = range.hasNext
        import org.apache.kafka.streams.KeyValue
        while (range.hasNext      ) {
          val next = range.next
          System.out.println(String.format("key: %s | value: %s", next.key, next.value))
        }
      }
      case Failure(f) => println(f)
    }

}

After doing that its all happy days for me.