Intermittent librdkafka consumer crash with `malloc(): mismatching next->prev_size (unsorted)`

370 Views Asked by At

I am using single node Kafka as a message broker between a C application (model) and a Python application (controller). Both sides - i.e. model and controller produce and consume, from separate topics. The workflow is as below:

  1. C app (Model) waits until a message is consumed - upon production from the Python app (controller).
  2. C app then does its processing and then produces the result, this is then consumed by the Python app and the process is repeated.

The apps are supposed to run for long times - 3-5 months. And each produce - consume cycle happens at around 1-5 seconds interval. Ideally, I want a synchronous producer and a synchronous consumer. i.e. the execution should wait until data is received and so on.

The setup works fine for sometime and then the C side consumer crashes with the error:

malloc(): mismatching next->prev_size (unsorted).

I was able to capture the stack trace using GDB, and it is below:

malloc(): mismatching next->prev_size (unsorted)

Thread 8 "rdk:broker101" received signal SIGABRT, Aborted.
[Switching to Thread 0xb24f88c0 (LWP 16274)]
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
50      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) where
#0  0xb61a0f14 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
#1  0xb618c230 in __GI_abort () at abort.c:79
#2  0xb61dc50c in __libc_message (action=action@entry=do_abort, fmt=<optimized out>) at ../sysdeps/posix/libc_fatal.c:181
#3  0xb61e3034 in malloc_printerr (str=<optimized out>) at malloc.c:5341
#4  0xb61e6998 in _int_malloc (av=av@entry=0xb1700010, bytes=bytes@entry=36) at malloc.c:3737
#5  0xb61e8308 in __GI___libc_malloc (bytes=36) at malloc.c:3057
#6  0xb65e351c in rd_malloc (sz=36) at rd.h:127
#7  0xb65e351c in rd_buf_alloc_segment0 (rbuf=rbuf@entry=0xb17032a8, size=36) at rdbuf.c:132
#8  0xb65e3610 in rd_buf_alloc_segment (rbuf=rbuf@entry=0xb17032a8, min_size=<optimized out>, max_size=<optimized out>) at rdbuf.c:165
#9  0xb65e3778 in rd_buf_write_ensure_contig (rbuf=rbuf@entry=0xb17032a8, size=<optimized out>) at rdbuf.c:194
#10 0xb6523df0 in rd_kafka_recv (rkb=rkb@entry=0xe5d08) at rdkafka_broker.c:1895
#11 0xb6546cb0 in rd_kafka_transport_io_event (rktrans=rktrans@entry=0xb1700c30, events=events@entry=1) at rdkafka_transport.c:740
#12 0xb6547728 in rd_kafka_transport_io_serve (rktrans=0xb1700c30, rktrans@entry=0xffffffff, timeout_ms=<optimized out>) at rdkafka_transport.c:801
#13 0xb652eaac in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0xe5d08, abs_timeout=169051526973) at rdtime.h:160
#14 0xb652f050 in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0xe5d08, abs_timeout=<optimized out>) at rdkafka_broker.c:4982
#15 0xb65305f0 in rd_kafka_broker_serve (rkb=rkb@entry=0xe5d08, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:5087
#16 0xb6530988 in rd_kafka_broker_thread_main (arg=0xe5d08) at rdkafka_broker.c:5244
#17 0xb62c9494 in start_thread (arg=0xb24f88c0) at pthread_create.c:486

I have tried both the high-level consumer as well as the simple consumer.

Any pointers in debugging this are appreciated.

I am using librdkafka v1.6.2 C library, confluent-kafka python package and Kafka broker using confluentinc/cp-latest (3.0).

The code in my consumer function is below:

int ping_controller_status(..)
{
    char *message;
    int msg_len = 0;
    printf("In the consumer now. \n");

    /* Start consuming */
    if (rd_kafka_consume_start(rkt_cons, partition, RD_KAFKA_OFFSET_END) ==
        -1)
    {
        err = rd_kafka_last_error();
        fprintf(stderr, "%% Failed to start consuming: %s\n",
                rd_kafka_err2str(err));
        if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
            fprintf(stderr,
                    "%% Broker based offset storage "
                    "requires a group.id, "
                    "add: -X group.id=yourGroup\n");
        exit(1);
    }

    while (keep_running)
    {
        rd_kafka_message_t *rkm;

        /* Poll for errors, etc. */
        rd_kafka_poll(rk_cons, 10000);

        /* Consume single message.
         * See rdkafka_performance.c for high speed
         * consuming of messages. */
        rkm = rd_kafka_consume(rkt_cons, partition, 10000);
        if (!rkm) /* timeout */
            continue;

        message = (const char *)rkm->payload;
        msg_len = (int)rkm->len;

        /* Return message to rdkafka */
        rd_kafka_message_destroy(rkm);

        message[msg_len] = '\0';

       //
       // Process message here
       //
        rd_kafka_consume_stop(rkt_cons, partition);
        return msg_len;
    }
}

I have also tried keep the consumer start stop calls, in the initialization function and only call them once at the beginning, but the behavior has not changed. Crashes still happen at random intervals during the program execution and the Python process is up and waiting for response from the C app.

0

There are 0 best solutions below