Kafka Producer stops +/- randomly and does not accept modified Properties

805 Views Asked by At

i use windows 10 to work with kafka v. 0.10.1.0 - at the moment everything happens locally, what i want to say with that is that i run a local zookeeper along with kafka. my goal is to forward some integers to a single broker kafka consumer.

this is a simplified minimal example of my current code:

                import kafka.javaapi.producer.Producer;
                import kafka.producer.KeyedMessage;
                import kafka.producer.ProducerConfig;

                import java.io.IOException;
                import java.util.Properties;

                public class simple_example {


                    public static void main(String[] args) throws IOException {

                        long startTime = System.currentTimeMillis();

                        Properties properties = new Properties();

                        //properties.put("retries", 0);
                        properties.put("metadata.broker.list", "localhost:9092");
                        properties.put("serializer.class", "kafka.serializer.StringEncoder");
                        properties.put("request.required.acks", "1");

                        Producer<Integer, String> producer = null;

                        int i = 0;

                        // 10e5 is approx the amount of events i want to forward in the real life version of this
                        while (i < 10e5) {

                            try {

                                producer = new Producer<Integer, String>(new ProducerConfig(properties));

                                    String topic = "test2";
                                    String msg = "Event Number: "+i;

                                    KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, msg);
                                    producer.send(data);


                                    System.out.println("-----------------------------------------------------------------------");
                                    long endTime   = System.currentTimeMillis();
                                    long totalTime = endTime - startTime;
                                    System.out.println(totalTime/(1e3)+" [sec] --> ex/s = " + 1/(totalTime/(1e3)));
                                    System.out.println("i= " + i);
                                    System.out.println("-----------------------------------------------------------------------");
                                    i++;
                                    //producer.close();
                                    }  catch (Exception e) {
                                          e.printStackTrace();

                                 } finally {
                                    producer.close();
                                }

                    }
                }

                }

it works fine for the first couple thousands events. but somewhere between 8000-10000 it stops and delivers the following error:

                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Verifying properties
                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Property metadata.broker.list is overridden to localhost:9092
                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Property request.required.acks is overridden to 1
                            15:23:52,758 WARN  kafka.utils.VerifiableProperties                              - Property retries is not valid
                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Property serializer.class is overridden to kafka.serializer.StringEncoder
                        15:23:52,758 INFO  kafka.client.ClientUtils$                                     - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test2)
                        15:23:52,758 INFO  kafka.producer.SyncProducer                                   - Connected to localhost:9092 for producing
                        15:23:52,758 INFO  kafka.producer.SyncProducer                                   - Disconnecting from localhost:9092
                        15:23:52,758 WARN  kafka.client.ClientUtils$                                     - Fetching topic metadata with correlation id 0 for topics [Set(test2)] from broker [id:0,host:localhost,port:9092] failed
                        java.nio.channels.ClosedChannelException
                        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
                        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
                        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
                        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
                        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
                        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
                        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
                        at kafka.utils.Utils$.swallow(Utils.scala:172)
                        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
                        at kafka.utils.Utils$.swallowError(Utils.scala:45)
                        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
                        at kafka.producer.Producer.send(Producer.scala:77)
                        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
                        at simple_example.main(simple_example.java:40)
                        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                        at java.lang.reflect.Method.invoke(Method.java:497)
                        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

i suspect that it might have something to do with the properties i send along. Thats why i tried to fix it by additional ones like the one that is commented in order to do such i followed https://kafka.apache.org/documentation.html#brokerconfigs

but for all i changed (apart from the three above in the code) i always get a message of the form:

    16:25:39,810 INFO  kafka.utils.VerifiableProperties - Verifying properties
    16:25:39,810 INFO  kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092
    16:25:39,810 INFO  kafka.utils.VerifiableProperties - Property request.required.acks is overridden to 1
    16:25:39,810 WARN  kafka.utils.VerifiableProperties - Property retries is not valid

telling me that my modification is somehow not valid.

my questions are

  • how can i get kafka to work trough all the events?

  • how should i modify the properties for kafka to understand and accept it?

thanks a lot in advance for potential help, references, comments

best, t.

0

There are 0 best solutions below