i use windows 10 to work with kafka v. 0.10.1.0 - at the moment everything happens locally, what i want to say with that is that i run a local zookeeper along with kafka. my goal is to forward some integers to a single broker kafka consumer.
this is a simplified minimal example of my current code:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.IOException;
import java.util.Properties;
public class simple_example {
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
Properties properties = new Properties();
//properties.put("retries", 0);
properties.put("metadata.broker.list", "localhost:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
Producer<Integer, String> producer = null;
int i = 0;
// 10e5 is approx the amount of events i want to forward in the real life version of this
while (i < 10e5) {
try {
producer = new Producer<Integer, String>(new ProducerConfig(properties));
String topic = "test2";
String msg = "Event Number: "+i;
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, msg);
producer.send(data);
System.out.println("-----------------------------------------------------------------------");
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
System.out.println(totalTime/(1e3)+" [sec] --> ex/s = " + 1/(totalTime/(1e3)));
System.out.println("i= " + i);
System.out.println("-----------------------------------------------------------------------");
i++;
//producer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
}
it works fine for the first couple thousands events. but somewhere between 8000-10000 it stops and delivers the following error:
15:23:52,758 INFO kafka.utils.VerifiableProperties - Verifying properties
15:23:52,758 INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092
15:23:52,758 INFO kafka.utils.VerifiableProperties - Property request.required.acks is overridden to 1
15:23:52,758 WARN kafka.utils.VerifiableProperties - Property retries is not valid
15:23:52,758 INFO kafka.utils.VerifiableProperties - Property serializer.class is overridden to kafka.serializer.StringEncoder
15:23:52,758 INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test2)
15:23:52,758 INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing
15:23:52,758 INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092
15:23:52,758 WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(test2)] from broker [id:0,host:localhost,port:9092] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at simple_example.main(simple_example.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
i suspect that it might have something to do with the properties i send along. Thats why i tried to fix it by additional ones like the one that is commented in order to do such i followed https://kafka.apache.org/documentation.html#brokerconfigs
but for all i changed (apart from the three above in the code) i always get a message of the form:
16:25:39,810 INFO kafka.utils.VerifiableProperties - Verifying properties
16:25:39,810 INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092
16:25:39,810 INFO kafka.utils.VerifiableProperties - Property request.required.acks is overridden to 1
16:25:39,810 WARN kafka.utils.VerifiableProperties - Property retries is not valid
telling me that my modification is somehow not valid.
my questions are
how can i get kafka to work trough all the events?
how should i modify the properties for kafka to understand and accept it?
thanks a lot in advance for potential help, references, comments
best, t.