RejectedExecutionException when running Kafka + Netty in OpenShift

67 Views Asked by At

We're developing a stateful Kafka application in Kotlin designed to consume records from two topics and produce records to another topic. The application runs as a statefulset in OpenShift with three pods.

We've been using Ktor + Netty as the server framework, but due to some IllegalStateException we've decided not to use fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args).

Instead, we're using the following configuration in our Application.kt file:


object App : ILogging by Logging<App>()

fun main() {
    val promRegistry = PrometheusObject.prometheusRegistry

    val kafkaStreams = configureStreams(promRegistry)
    val nettyServer = configureNettyServer(kafkaStreams)

    addShutdownHook(kafkaStreams, promRegistry, nettyServer)

    App.log.atInfo().setMessage("Starting Kafka Streams").log()
    kafkaStreams.start()

    App.log.atInfo().setMessage("Starting Netty Server").log()
    nettyServer.start(wait = true)
}

fun configureStreams(promRegistry: PrometheusMeterRegistry): KafkaStreams {
    val appConfig = ConfigVariables(HoconApplicationConfig(ConfigFactory.load()))

    val topicSerdeConfig =
        TopicSerdeConfig(
            appConfig.streamInputTopic,
            appConfig.streamOutputTopic,
            appConfig.schemaRegistry,
        )

    val kafkaStreams = createStream(appConfig, topicSerdeConfig)

    val customExceptionHandler =
        StreamsUncaughtExceptionHandler {
            App.log.atError()
                .setMessage("Error running stream - the client will shut down!")
                .addKeyValue("exception", it)
                .log()
            StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
        }
    kafkaStreams.setUncaughtExceptionHandler(customExceptionHandler)
    KafkaStreamsMetrics(kafkaStreams).bindTo(promRegistry)

    return kafkaStreams
}

fun configureNettyServer(streams: KafkaStreams): NettyApplicationEngine {
    return embeddedServer(Netty, port = 8080) {
        App.log.atInfo().setMessage("Setting up server endpoints").log()
        install(Routing) {
            prometheusController()
            healthController(streams)
            configController()
        }

        App.log.atInfo().setMessage("Setting /prometheus as default route path").log()
        routing {
            get("/") {
                call.respondRedirect("/prometheus", true)
            }
        }

        App.log.atInfo().setMessage("Installing Content Negotiation").log()
        install(ContentNegotiation) {
            gson {
                setPrettyPrinting()
                ignoreType<String>()
            }
        }
    }
}

fun addShutdownHook(
    streams: KafkaStreams,
    promRegistry: PrometheusMeterRegistry,
    server: NettyApplicationEngine,
) {
    App.log.atInfo().setMessage("Adding shutdown hook").log()
    Runtime.getRuntime().addShutdownHook(
        Thread {
            try {
                App.log.atInfo().setMessage("Running Shutdown Hook").log()

                App.log.atInfo().setMessage("Closing Kafka Streams").log()
                streams.close(Duration.ofSeconds(12))

                App.log.atInfo().setMessage("Closing Prometheus Registry").log()
                promRegistry.close()

                App.log.atInfo().setMessage("Stopping Netty Server").log()
                server.stop(15000, 25000)
            } catch (e: Exception) {
                App.log.atError().setMessage("Error During Shutdown").log()
                e.printStackTrace()
            }
        },
    )
}

Unfortunately, this implementation throws the exception java.util.concurrent.RejectedExecutionException, with the message Failed to submit a listener notification task. Event loop shut down?. This causes the affected pod to restart.

Is the logic used in the Application.kt file a valid configuration for running a Netty server in OpenShift? And what might cause the RejectedExecutionException?

0

There are 0 best solutions below