Zeppelin 6.5 + Apache Kafka connector for Structured Streaming 2.0.2

2k Views Asked by At

I'm trying to run a zeppelin notebook that contains spark's Structured Streaming example with Kafka connector.

>kafka is up and running on localhost port 9092 

>from zeppelin notebook, sc.version returns String = 2.0.2

Here is my environment:

kafka: kafka_2.10-0.10.1.0

zeppelin: zeppelin-0.6.2-bin-all

spark: spark-2.0.2-bin-hadoop2.7

Here is the code in my zeppelin notebook:

import org.apache.enter code herespark.sql.functions.{explode, split}


// Setup connection to Kafka val kafka = spark.readStream  
.format("kafka")   
.option("kafka.bootstrap.servers","localhost:9092")   
// comma separated list of broker:host  
.option("subscribe", "twitter")    
// comma separated list of topics 
.option("startingOffsets", "latest") 
// read data from the end of the stream   .load()

Here is the error I'm getting when I run the notebook:

import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 elided Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at scala.util.Try$.apply(Try.scala:192)

Any help advice would be greatly appreciated.

Thnx

2

There are 2 best solutions below

0
On

You probably have figured this out already but putting in the answer for others, you have to add the following to zeppelin-env.sh.j2

SPARK_SUBMIT_OPTIONS=--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0

along with potentially other dependencies if you are using the kafka client:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql_2.11:2.1.0,org.apache.kafka:kafka_2.11:0.10.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0,org.apache.kafka:kafka-clients:0.10.0.1
0
On

This solution has been tested in zeppelin version 0.10.1.

You need to add dependencies of your code. It can be done with zeppelin UI. Go to Interpreter panel (http://localhost:8080/#/interpreter) and in spark section, under Dependencies you can add artifact of each dependency. If by adding spark-sql-kafka you ran into other dependency issues, add all packages the spark-sql-kafka needs. You can find them in Compile Dependencies section of it's maven repository.

I'm working with spark version 3.0.0 and scala version 2.12 and I was trying to integrate spark with kafka. I managed to get passed this issue by adding all the bellow artifacts:

org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
    
com.google.code.findbugs:jsr305:3.0.0   

org.apache.spark:spark-tags_2.12:3.3.0  

org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.3.0 

org.apache.kafka:kafka-clients:3.0.0