We're developing a stateful Kafka application in Kotlin designed to consume records from two topics and produce records to another topic. The application runs as a statefulset in OpenShift with three pods.
We've been using Ktor + Netty as the server framework, but due to some IllegalStateException
we've decided not to use fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)
.
Instead, we're using the following configuration in our Application.kt
file:
object App : ILogging by Logging<App>()
fun main() {
val promRegistry = PrometheusObject.prometheusRegistry
val kafkaStreams = configureStreams(promRegistry)
val nettyServer = configureNettyServer(kafkaStreams)
addShutdownHook(kafkaStreams, promRegistry, nettyServer)
App.log.atInfo().setMessage("Starting Kafka Streams").log()
kafkaStreams.start()
App.log.atInfo().setMessage("Starting Netty Server").log()
nettyServer.start(wait = true)
}
fun configureStreams(promRegistry: PrometheusMeterRegistry): KafkaStreams {
val appConfig = ConfigVariables(HoconApplicationConfig(ConfigFactory.load()))
val topicSerdeConfig =
TopicSerdeConfig(
appConfig.streamInputTopic,
appConfig.streamOutputTopic,
appConfig.schemaRegistry,
)
val kafkaStreams = createStream(appConfig, topicSerdeConfig)
val customExceptionHandler =
StreamsUncaughtExceptionHandler {
App.log.atError()
.setMessage("Error running stream - the client will shut down!")
.addKeyValue("exception", it)
.log()
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
}
kafkaStreams.setUncaughtExceptionHandler(customExceptionHandler)
KafkaStreamsMetrics(kafkaStreams).bindTo(promRegistry)
return kafkaStreams
}
fun configureNettyServer(streams: KafkaStreams): NettyApplicationEngine {
return embeddedServer(Netty, port = 8080) {
App.log.atInfo().setMessage("Setting up server endpoints").log()
install(Routing) {
prometheusController()
healthController(streams)
configController()
}
App.log.atInfo().setMessage("Setting /prometheus as default route path").log()
routing {
get("/") {
call.respondRedirect("/prometheus", true)
}
}
App.log.atInfo().setMessage("Installing Content Negotiation").log()
install(ContentNegotiation) {
gson {
setPrettyPrinting()
ignoreType<String>()
}
}
}
}
fun addShutdownHook(
streams: KafkaStreams,
promRegistry: PrometheusMeterRegistry,
server: NettyApplicationEngine,
) {
App.log.atInfo().setMessage("Adding shutdown hook").log()
Runtime.getRuntime().addShutdownHook(
Thread {
try {
App.log.atInfo().setMessage("Running Shutdown Hook").log()
App.log.atInfo().setMessage("Closing Kafka Streams").log()
streams.close(Duration.ofSeconds(12))
App.log.atInfo().setMessage("Closing Prometheus Registry").log()
promRegistry.close()
App.log.atInfo().setMessage("Stopping Netty Server").log()
server.stop(15000, 25000)
} catch (e: Exception) {
App.log.atError().setMessage("Error During Shutdown").log()
e.printStackTrace()
}
},
)
}
Unfortunately, this implementation throws the exception java.util.concurrent.RejectedExecutionException
, with the message Failed to submit a listener notification task. Event loop shut down?
. This causes the affected pod to restart.
Is the logic used in the Application.kt
file a valid configuration for running a Netty server in OpenShift? And what might cause the RejectedExecutionException
?