I want to use the camel-minio-sink-kafka-connector
from the Apache Camel project to write messages from a Kafka topic to a Minio storage bucket. The problem is that the messages are in the AVRO format, and the connector seems unable to handle them; there is apparently no type converter available for that, as the resulting exception pretty clearly states (trimmed down to the root cause for brevity):
org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: org.apache.kafka.connect.data.Struct to the required type: java.io.InputStream
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:274)
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123)
... 29 more
I tried simply adding my own type converter to the connector; the only instruction the documentation gives is to add a class with suitable method and annotations; suggesting that the framework will automatically discover it and register it as a converter. So I did, like this:
package org.apache.camel.kafkaconnector;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.camel.Converter;
import org.apache.kafka.connect.data.Struct;
@Converter(generateLoader = true)
public class TestConverter {
@Converter
public static InputStream toInputStream(Struct struct) {
// just to test if this actually works...
return new ByteArrayInputStream(struct.toString().getBytes(StandardCharsets.UTF_8));
}
}
However, that alone does in fact not seem to be sufficient, because I am still getting the same exception. So, what additional steps are required to register a suitable type conversion? Or should I try a different approach altogether?