My spark streaming app is reading from kafka using DStream approach and I'm trying to get the batch size to process 60,000 messages in 10 seconds.
What I've done,
- Created a topic with 3 partitions
spark.streaming.kafka.maxRatePerPartition = 60000spark.streaming.backpressure.enabled = true- set my batch duration to 10 seconds when I create the StreamingContext
- running in yarn mode with 2 executors (4 cores total for 3 partitions)
Now how I test that this is working.
I have a producer that sends 60,000 messages at once to the topic. When I check the spark UI I get the following:
batch time | Input size | processing time
10:54:30 | 17610 | 5s
10:54:20 | 32790 | 8s
10:54:10 | 9600 | 3s
So each batch time is 10 s apart. What I expect is 1 batch with 60,000 records. Is there some other parameter I am not setting? From what I have read about what I've currently set I should be getting 10 * 60,000 * 3 = 1800000 in a single batch.
spark.app.id = application_1551747423133_0677
spark.app.name = KafkaCallDEV
spark.driver.cores = 2
spark.driver.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc
spark.driver.memory = 3g
spark.driver.port = 33917
spark.executor.cores = 2
spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc
spark.executor.id = driver
spark.executor.instances = 2
spark.executor.memory = 2g
spark.master = yarn
spark.scheduler.mode = FIFO
spark.streaming.backpressure.enabled = true
spark.streaming.kafka.maxRatePerPartition = 60000
spark.submit.deployMode = cluster
spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.ui.port = 0
spark.yarn.app.container.log.dir = /data0/yarn/container-logs/application_1551747423133_0677/container_1551747423133_0677_01_000002
Below is what I printed out using
logger.info(sparkSession.sparkContext.getConf.getAll.mkString("\n"))
I removed some of the unnecessary logs like server address, app name, etc..
(spark.executor.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) (spark.yarn.app.id,application_1551747423133_0681)
(spark.submit.deployMode,cluster)
(spark.streaming.backpressure.enabled,true)
(spark.yarn.credentials.renewalTime,1562764821939ms)
(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)
(spark.executor.memory,2g)
(spark.yarn.credentials.updateTime,1562769141873ms)
(spark.driver.cores,2)
(spark.executor.id,driver)
(spark.executor.cores,2)
(spark.master,yarn)
(spark.driver.memory,3g)
(spark.sql.warehouse.dir,/user/hive/warehouse)
(spark.ui.port,0)
(spark.driver.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc)
(spark.executor.instances,2)
(spark.driver.port,37375)
I've also got some Kafka config's that are being printed so I will post those below too.
org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 60000
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
spark.streaming.kafka.maxRatePerPartition = 60000 means that
17610 + 32790 + 9600 = 60000 your batch size is achieved.
see this
your 3 kafka partitions (having 60k messaages) is read by spark in chunks/spark partitions, in your case, 3 partitions from spark. but original number of messages in 3 kafka partitions are 60000 (17610 + 32790 + 9600). Even high message rate input flow is coming back pressure will maintain the uniform rate of messages using RateLimiter and PIDRateEstimator
So you are done here....
Further reffer my post -Short note on Spark Streaming Back Pressure for better understanding
Conclusion : If you enable back pressure, Irrespective of what rate you are sending the messages. it will allow constant rate of messages
like this illustratative geneal example ... where back pressure properties are like inflow control - pressure adjusting screw to maintain uniform rate of message flow.