Why does spark-submit fail to find kafka data source unless --packages is used?

4.1k Views Asked by At

I am trying to integrate Kafka in my Spark app, here is my POM file required entries:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.stream.kafka.version}</version>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>${kafka.version}</version>
</dependency>

Corresponding artifact versions are:

<kafka.version>0.10.2.0</kafka.version>
<spark.stream.kafka.version>2.2.0</spark.stream.kafka.version>

I have been scratching my head over:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html

I also tried supplying the jar with --jars parameter, however it is not helping. What am I missing here?

Code:

private static void startKafkaConsumerStream() {

        Dataset<HttpPackage> ds1 = _spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                .option("subscribe", HTTP_FED_VO_TOPIC)
                .load() // Getting the error here
                .as(Encoders.bean(HttpPackage.class));

        ds1.foreach((ForeachFunction<HttpPackage>)  req ->System.out.print(req));

    }

And _spark is defined as:

_spark = SparkSession
                .builder()
                .appName(_properties.getProperty("app.name"))
                .config("spark.master", _properties.getProperty("master"))
                .config("spark.es.nodes", _properties.getProperty("es.hosts"))
                .config("spark.es.port", _properties.getProperty("es.port"))
                .config("spark.es.index.auto.create", "true")
                .config("es.net.http.auth.user", _properties.getProperty("es.net.http.auth.user"))
                .config("es.net.http.auth.pass", _properties.getProperty("es.net.http.auth.pass"))
                .getOrCreate();

My imports are:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

However when I run my code as mentioned here and which is with the package option:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

It works

3

There are 3 best solutions below

0
On

Add below dependency to your pom.xml file.

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
</dependency>
2
On

Update your dependencies and versions. Below given dependencies should work fine:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

PS: Note provided scope in first two dependencies.

3
On

Spark Structured Streaming supports Apache Kafka as the streaming source and sink using the external kafka-0-10-sql module.

kafka-0-10-sql module is not available to Spark applications that are submitted for execution using spark-submit. The module is external and to have it available you should define it as a dependency.

Unless you use kafka-0-10-sql module-specific code in your Spark application you don't have to define the module as a dependency in pom.xml. You simply don't need a compilation dependency on the module since no code uses the module's code. You code against interfaces which is one of the reasons why Spark SQL is so pleasant to use (i.e. it requires very little to code to have fairly sophisticated distributed application).

spark-submit however will require --packages command-line option that you've reported it worked fine.

However when I run my code as mentioned here and which is with the package option:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

The reason it worked fine with --packages is that you have to tell Spark infrastructure where to find the definition of kafka format.

That leads us to the other "issue" (or a requirement) to run streaming Spark applications with Kafka. You have to specify the runtime dependency on spark-sql-kafka module.

You specify a runtime dependency using --packages command-line option (that downloads the necessary jars after you spark-submit your Spark application) or creating a so-called uber-jar (or a fat-jar).

That's where pom.xml comes to play (and that's why people offered their help with pom.xml and the module as a dependency).

So, first of all, you have to specify the dependency in pom.xml.

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

And the last but not least, you have to build an uber-jar that you configure in pom.xml using Apache Maven Shade Plugin.

With Apache Maven Shade Plugin you create an Uber JAR that will include all the "infrastructure" for kafka format to work, inside the Spark application jar file. As a matter of fact, the Uber JAR will contain all the necessary runtime dependencies and so you could spark-submit with the jar alone (and no --packages option or similar).