I want to use the camel-minio-sink-kafka-connector from the Apache Camel project to write messages from a Kafka topic to a Minio storage bucket. The problem is that the messages are in the AVRO format, and the connector seems unable to handle them; there is apparently no type converter available for that, as the resulting exception pretty clearly states (trimmed down to the root cause for brevity):

org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: org.apache.kafka.connect.data.Struct to the required type: java.io.InputStream
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:274)
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123)
    ... 29 more

I tried simply adding my own type converter to the connector; the only instruction the documentation gives is to add a class with suitable method and annotations; suggesting that the framework will automatically discover it and register it as a converter. So I did, like this:

package org.apache.camel.kafkaconnector;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

import org.apache.camel.Converter;
import org.apache.kafka.connect.data.Struct;

@Converter(generateLoader = true)
public class TestConverter {
    @Converter
    public static InputStream toInputStream(Struct struct) {
        // just to test if this actually works...
        return new ByteArrayInputStream(struct.toString().getBytes(StandardCharsets.UTF_8));
    }
}

However, that alone does in fact not seem to be sufficient, because I am still getting the same exception. So, what additional steps are required to register a suitable type conversion? Or should I try a different approach altogether?

0

There are 0 best solutions below