Pyspark socket timeout error. return self._sock.recv_into(b) socket.timeout: timed out

3.9k Views Asked by At

I have written a spark program (Python 3.6 and Spark 2.3.2) for Collaborative Filtering Recommendation System that works on 2 cases:

  1. Case 1: Item-based CF recommendation system
  2. Case 2: User-based CF recommendation system with Min-Hash LSH

I have written train and predict programs that has these 2 cases. My code works for user based recommendation but when I try to train my model for Item-based CF, I get the following error:

2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
    length = stream.read(4)
  File "C:\Users\17372\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

I tried solving this issue using solutions on this link: Pyspark socket timeout exception after application running for a while

It did not work.

I found a solution to add "--spark.worker.timeout=120" in execution as follows:

bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120

I still see the same error. Tried Try Catch blocks as well, but I am not sure how to do it right.

What do I do?

My code for Item-based CF:

 if model_type == ITEM_BASED_MODEL:
        # group original data by bidx, and remove those unpopular business (rated time < 3)
        # tuple(bidx, (uidx, score))
        # [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
        shrunk_bid_uids_rdd = input_lines \
            .map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) \
            .groupByKey().mapValues(lambda uid_score: list(uid_score)) \
            .filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
            .mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
            .mapValues(lambda val: flatMixedList(val))

        candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)

        # convert shrunk_bid_uids_rdd into dict form
        # dict(bidx: dict(uidx: score))
        # => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
        bid_uid_dict = shrunk_bid_uids_rdd \
            .map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
            .flatMap(lambda kv_items: kv_items.items()).collectAsMap()

        # generate all possible pair between candidate bidx
        # and compute the pearson similarity
        candidate_pair = candidate_bids.cartesian(candidate_bids) \
            .filter(lambda id_pair: id_pair[0] < id_pair[1]) \
            .filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
                                                  bid_uid_dict[id_pair[1]])) \
            .map(lambda id_pair: (id_pair,
                                  computeSimilarity(bid_uid_dict[id_pair[0]],
                                                    bid_uid_dict[id_pair[1]]))) \
            .filter(lambda kv: kv[1] > 0) \
            .map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
                             "b2": reversed_index_bus_dict[kv[0][1]],
                             "sim": kv[1]})
2

There are 2 best solutions below

0
On

I encountered the same error with Python 3.7 and Spark 2.4.4 running locally. No combination of spark options helped.

I was reading rows from parquet files which were heavily skewed. They contained a binary column with values between a few bytes and more than 10MB. The resulting dataframe contained a relatively small number of partitions despite setting a high number for spark.default.parallelism. The number of partitions remained similar to the number of parquet files I was reading and I kept getting a socket timeout.

I tried to set spark.sql.files.maxPartitionBytes to a small enough value but the error was still there. The only thing that helped was a repartition after reading the data to increase the number of partitions and to distribute the rows more evenly. Note that this is only an observation and I still cannot explain why the error went away.

If data skew is also a topic here it could be mitigated by changing your code to:

input_lines \
    .repartition(n) \
    .map(...)

n depends on your cluster and job characteristics and there is a sweet spot. If n is too low you will get the socket timeout. If n is too large it will have a negative effect on performance.

0
On

doing a repartition(1) worked for me. I was facing the same issue with very few rows(dummy data). Another alternative solution that I tried is

.config('spark.default.parallelism', 1)
.config('spark.sql.shuffle.partitions', 1)

But this 1 value is there becaise I was trying it for dummy data