I am trying to integrate Kafka in my Spark app, here is my POM file required entries:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.stream.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
Corresponding artifact versions are:
<kafka.version>0.10.2.0</kafka.version>
<spark.stream.kafka.version>2.2.0</spark.stream.kafka.version>
I have been scratching my head over:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
I also tried supplying the jar with --jars
parameter, however it is not helping. What am I missing here?
Code:
private static void startKafkaConsumerStream() {
Dataset<HttpPackage> ds1 = _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.load() // Getting the error here
.as(Encoders.bean(HttpPackage.class));
ds1.foreach((ForeachFunction<HttpPackage>) req ->System.out.print(req));
}
And _spark is defined as:
_spark = SparkSession
.builder()
.appName(_properties.getProperty("app.name"))
.config("spark.master", _properties.getProperty("master"))
.config("spark.es.nodes", _properties.getProperty("es.hosts"))
.config("spark.es.port", _properties.getProperty("es.port"))
.config("spark.es.index.auto.create", "true")
.config("es.net.http.auth.user", _properties.getProperty("es.net.http.auth.user"))
.config("es.net.http.auth.pass", _properties.getProperty("es.net.http.auth.pass"))
.getOrCreate();
My imports are:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
However when I run my code as mentioned here and which is with the package option:
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
It works
Add below dependency to your
pom.xml
file.