How to implement camel kafka producer and consumer which is using kafka brokers and user name and password but not ssl certs.

my kafka broker configurations look like this

{
    "kafka_brokers_sasl":
    "kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093"
  ,
  "user": "**********",
  "password": "********"
}
1

There are 1 best solutions below

1
Mickael Maison On

In order to connect to Message Hub you need at least Apache Camel 2.20.

Then add the following query parameters to your URI:

  • saslMechanism=PLAIN
  • securityProtocol=SASL_SSL
  • sslProtocol=TLSv1.2
  • sslEnabledProtocols=TLSv1.2
  • saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";

For example, for a producer:

final String brokers = "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093";

final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

from("direct:in")
.to("kafka:test?" 
    + "brokers=" + brokers
    + "&saslMechanism=PLAIN"  
    + "&securityProtocol=SASL_SSL"
    + "&sslProtocol=TLSv1.2"
    + "&sslEnabledProtocols=TLSv1.2" 
    + "&sslEndpointAlgorithm=HTTPS"
    + "&saslJaasConfig=" + saslJaasConfig);

and a consumer:

final String brokers = "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093";

final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

from("kafka:test?" 
    + "brokers=" + brokers
    + "&saslMechanism=PLAIN"  
    + "&securityProtocol=SASL_SSL"
    + "&sslProtocol=TLSv1.2"
    + "&sslEnabledProtocols=TLSv1.2" 
    + "&sslEndpointAlgorithm=HTTPS"
    + "&saslJaasConfig=" + saslJaasConfig
    + "&groupId=mygroup")
.to("stream:out");