KafkaStreams How to instantiate a ConsumerRecordFactory?

1.2k Views Asked by At

I am trying to use the ConsumerRecordFactory provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:

// Properties of the application
Properties streamsConfiguration = new Properties();

// Give the Streams application a unique name.  The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer()
);

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);

My problem is that when I compile my code I get the following error:

Error:(70, 52) java: reference to create is ambiguous
  both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
  and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match

So I understand that kafka streams defines the generic method create(K,V,long) and that when I create my factory with non generic types I create a new method which is in conflict with the first one.

My question is how am I supposed to instanciate my ConsumerRecordFactory?

I tried making my factory more generic with ConsumerRecordFactory<Object, Integer> but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory, and this SO answer seems to be using the same code as the documentation.

(I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams is a good way to reach people used to the ConsumerRecordFactory)

1

There are 1 best solutions below

4
On BEST ANSWER

There are some issues in the below code:

// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer() );

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
  1. You have defined valueType as Integer in the ConsumerRecordFactory, but in create() method you are passing Long type value.
  2. factory.create() returns a ConsumerRecord instead of ConsumerRecordFactory.

Regarding the ambiguity of method, you are right. So avoid that issue, use following :

ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
        new StringSerializer(),
        new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte[], byte[]> record = factory.create("input-topic","key", 42);