I'm trying to read data from kafka topic by using spark structured streaming on ec2(ubuntu) machine. If I try to read the data by using kafka stream only(kafka-console-consumer.sh) then there is no issue but as soon as I try to read same data with spark script then my spark script just prints the schema which I've defined and after that my producer script also starts getting stuck and give error message and after that broker also gets stuck. I've downloaded all the jars in spark/jars folder and all this setup is on same ec2 machine. I tried giving trigger and maxOffsetsPerTrigger as well but nothing worked, I tried all setup on new instance as well but same issue is there.

Downloaded Jars:

  • spark-sql-kafka-0-10_2.12-3.5.0.jar
  • kafka-clients-3.5.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.5.0.jar
  • commons-pool2-2.12.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.5.0.jar

Spark Version: 3.5.0, Kafka Version: 3.5.1, Scala Version: 2.12

producer.py (running this script with command: python3 producer.py )

from confluent_kafka import Producer
import csv
import json
import boto3
import time

kafka_config = {'bootstrap.servers': 'ec2_ip:9092'}
s3 = boto3.client('s3')
bucket_name = 'report-bucket'
csv_file = 'test_data.csv' 
topic = 'test_topic'

producer = Producer(kafka_config) 
csv_obj = s3.get_object(Bucket = bucket_name, Key = csv_file)

csv_data = csv_obj['Body'].read().decode('utf-8').splitlines()
reader = csv.DictReader(csv_data) 

print('sending data...')
for row in reader:
    data = json.dumps(row)
    print(data)
    producer.produce(topic, value=data)
    time.sleep(2)

Error from producer file after starting consumer script:

%5|1696336084.257|REQTMOUT|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Timed out ProduceRequest in flight (after 61330ms, timeout #0) %4|1696336085.859|REQTMOUT|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1696336087.252|FAIL|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: 1 request(s) timed out: disconnect (after 81814ms in state UP) %5|1696336114.556|REQTMOUT|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Timed out ApiVersionRequest in flight (after 9856ms, timeout #0) %4|1696336115.042|FAIL|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 11637ms in state APIVERSION_QUERY, 1 identical error(s) suppressed) %4|1696336116.069|REQTMOUT|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests

consumer.py (running command spark-submit consumer.py)

from pyspark.sql.types import StructType, StringType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json,col

spark = SparkSession.builder.appName("Kafka Consumer").getOrCreate()
kafka_broker = "ec2_ip:9092"
kafka_topic = "test_topic"

raw_df = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", kafka_broker) \
        .option("subscribe", kafka_topic) \
        .option("startingOffset", "earliest") \
        .load()

json_df = raw_df.selectExpr("CAST (value as STRING)")

sample_schema = StructType() \
    .add("Name", StringType(), True) \
    .add("Department", StringType(), True) \
    .add("Designation", StringType(), True) \
    .add("Salary", DoubleType(), True) \
    .add("Address", StringType(), True)


base_df = json_df.select(from_json(col("value"), sample_schema))
base_df.printSchema()
expanded_df = base_df.select("value.*")

query = expanded_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination(40000)

after printing below output this script gets stuck don't give any error

Picked up _JAVA_OPTIONS: -Xmx512m
Picked up _JAVA_OPTIONS: -Xmx512m
23/10/04 06:21:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
 |-- value: string (nullable = true)

root
 |-- sample: struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Department: string (nullable = true)
 |    |-- Designation: string (nullable = true)
 |    |-- Salary: double (nullable = true)
 |    |-- Address: string(nullable = true)


23/10/04 06:21:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f376c9f0-4842-4681-8fb7-14afddd1a70e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/04 06:21:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------+-------+-----------+-----------+--------+----------+----------+--------+
|Name|Department|Designation|Salary|Address|
+--------------+-------+-----------+-----------+--------+----------+----------+--------+
+--------------+-------+-----------+-----------+--------+----------+----------+--------+
0

There are 0 best solutions below