I've got a Scala application that uses Kafka Streams - and Embedded Kafka Schema Registry in its integration tests.

I'm currently trying to upgrade Kafka Streams from 2.5.1 to 3.3.1 - and everything is working locally as expected, with all unit and integration tests passing.

However, according to the upgrade guide on the Kafka Streams documentation, when upgrading Kafka Streams, "if upgrading from 3.2 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" (possible values are "0.10.0" - "3.2") and during the second you remove it".

I've therefore added this upgrade.from config to my code as follows:

val propsMap = Map(
  ...
  UPGRADE_FROM_CONFIG -> "2.5.1"
)

val props = new Properties()
properties.putAll(asJava(propsMap))

val streams = new KafkaStreams(topology, props);

However, doing this causes my integration tests to start failing with the following error:

[info]   java.net.BindException: Address already in use
[info]   at sun.nio.ch.Net.bind0(Native Method)
[info]   at sun.nio.ch.Net.bind(Net.java:461)
[info]   at sun.nio.ch.Net.bind(Net.java:453)
[info]   at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
[info]   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
[info]   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78)
[info]   at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:676)
[info]   at org.apache.zookeeper.server.ServerCnxnFactory.configure(ServerCnxnFactory.java:109)
[info]   at org.apache.zookeeper.server.ServerCnxnFactory.configure(ServerCnxnFactory.java:105)
[info]   at io.github.embeddedkafka.ops.ZooKeeperOps.startZooKeeper(zooKeeperOps.scala:26)

Does anyone know why that might be happening and how to resolve? And also additionally, if this use of the upgrade.from config is correct?

For additional context, my previous versions of the relevant libraries were:

"org.apache.kafka" %% "kafka-streams-scala" % "2.5.1"
"org.apache.kafka" % "kafka-clients" % "5.5.1-ccs"
"io.confluent" % "kafka-avro-serializer" % "5.5.1"
"io.confluent" % "kafka-schema-registry-client" % "5.5.1"
"org.apache.kafka" %% "kafka" % "2.5.1"
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % "5.5.1"

And my updated versions are:

"org.apache.kafka" %% "kafka-streams-scala" % "3.3.1"
"org.apache.kafka" % "kafka-clients" % "7.3.0-ccs"
"io.confluent" % "kafka-avro-serializer" % "7.3.0"
"io.confluent" % "kafka-schema-registry-client" % "7.3.0"
"org.apache.kafka" %% "kafka" % "3.3.1"
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % "7.3.0"

My integration tests use Embedded Kafka Schema Registry as follows in their test setup, with specific ports specified for Kafka, Zookeeper and Schema Registry:

class MySpec extends AnyWordSpec
    with EmbeddedKafkaConfig
    with EmbeddedKafka {

  override protected def beforeAll(): Unit = {
    super.beforeAll()
    EmbeddedKafka.start()
    ...
  }

  override protected def afterAll(): Unit = {
    ...
    EmbeddedKafka.stop()
    super.afterAll()
  }
}

I'm not quite sure what to try to resolve this issue.

In searching online, did find this open GitHub issue on Scalatest Embedded Kafka, which was the precursor to Embedded Kafka Schema Registry and seems to be a similar issue. However, it doesn't appear to have been resolved.

1

There are 1 best solutions below

0
On

Your config upgrade_from is not valid.

Cf https://kafka.apache.org/documentation/#streamsconfigs_upgrade.from

It should be 2.5, not 2.5.1.