How to send json data using librdkafka?

527 Views Asked by At

I'm trying to send a json payload using librdkafka c api. What I'm trying to do now is

#include <jansson.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

typedef struct my_data {
    char *id;
    char *value;
    unsigned long timestamp;
} my_data;

char * my_data_to_json(const my_data *ev);
void rk_dr_callback(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque);

int main(int argc, char * argv[])
{
    // configure producer
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    char errstr[512];

    rd_kafka_conf_set(conf, "bootstrap.servers", "k1.example.com:9093", errstr, sizeof(errstr));    
    
    rd_kafka_conf_set_dr_msg_cb(conf, rk_dr_callback);

    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));    

    if (!rk) {
        fprintf(stderr, "failed to create kafka producer: %s\n", errstr);
        return -1;
    }

    // create json
    my_data ev = {
        .id = "test-id",
        .value = "test-value",
        .timestamp = (unsigned long) time(NULL)
    };

    char *json = my_data_to_json(&ev);
    printf("json dump: %s\n", json);

    // publish data
    rd_kafka_resp_err_t err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("topic"),
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(json, strlen(json)),
        RD_KAFKA_V_KEY("key-1", strlen("key-1")), RD_KAFKA_V_OPAQUE(NULL),
        RD_KAFKA_V_END);

    // cleanup
    rd_kafka_flush(rk, 5 * 1000);
    
    if (rd_kafka_outq_len(rk) > 0) {
        fprintf(stderr, "%d message(s) were not delivered\n", rd_kafka_outq_len(rk));
    }

    rd_kafka_destroy(rk);
    free(json);

    return 0;
}

void rk_dr_callback(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
{
    if (msg->err) {
        printf("failed to send message: %s\n", rd_kafka_err2str(msg->err));
    }
    else {
        printf("delivered %zd bytes to partition %d\n", msg->len, msg->partition);
    }
}

char * my_data_to_json(const my_data *ev)
{
    /* build the JSON object {"id": "id", "value": "value", "timestamp": 12345678} */
    json_t *json = json_pack("{sssssi}", "id", ev->id, "value", ev->value, "timestamp", ev->timestamp);

    if (!json) {
        fprintf(stderr, "failed to construct json from data\n");
    }

    char *str = json_dumps(json, JSON_COMPACT);

    if (!str) {
        fprintf(stderr, "failed to encode json object\n");
    }

    return str;
}

Using the above code I manage to get the bytes to the broker. But the json payload seems to be malformed. The consumer (consumer is a C# client using Newtonsoft json lib to deserialise) throws the following error:

Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: . Path '', line 0, position 0.\n at Newtonsoft.Json.JsonTextReader.ParseValue()\n at ...

I can't quite figure out whether my mistake is in the way I construct the json object, encode it to a string or the way I publish the json string using librdkafka.

0

There are 0 best solutions below