I am trying to insert data to redis (Azure Cache for Redis) through spark. There are around 700 million rows and I am using spark-redis connector to insert data. It fails after sometime throwing this error. I am able to insert some rows but after sometime, some of the tasks start failing with the below error. I am running through jupyter notebook.

Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
    at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:205)
    at redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:43)
    at redis.clients.jedis.Protocol.process(Protocol.java:155)
    at redis.clients.jedis.Protocol.read(Protocol.java:220)
    at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:318)
    at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:236)
    at redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2259)
    at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:119)
    at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:819)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:429)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:360)
    at redis.clients.jedis.util.Pool.getResource(Pool.java:50)
    ... 27 more
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.net.SocketInputStream.read(SocketInputStream.java:127)
    at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:199)
    ... 38 more

This is the way I am trying to write data.

df.write
.option("host", REDIS_URL)
.option("port", 6379)
.option("auth", <PWD>)
.option("timeout", 20000)
.format("org.apache.spark.sql.redis")
.option("table", "testrediskeys").option("key.column", "dummy").mode("overwrite").save()
Spark : 3.0
Scala : 2.12
spark-redis: com.redislabs:spark-redis_2.12:2.6.0
1

There are 1 best solutions below

0
On

I have faced same problem and the following configuration of my spark context helped:

val spark = SparkSession.builder()
      .appName("My-lovely-app")
      .master(options.masterSpec)
      .config("spark.redis.host", redisHost)
      .config("spark.redis.port", redisPort)
      .config("spark.redis.auth", redisPass)
      .config("spark.redis.timeout", redisSparkTimeout)
      .config("redis.timeout", redisTimeout)
      .config("spark.redis.max.pipeline.size", redisSparkMaxPipelineSize)
      .getOrCreate()

So, you need to increase spark.redis.timeout and redis.timeout for bigger value. The value of 3600000 ms (1 hour) for both configs helped me to load more than 500 millions lists in my Redis Cluster 101. For massive loading (such as you have) optimisation it would better to increase spark.redis.max.pipeline.size.