Does Spark shuffle write all intermediate data to disk?

345 Views Asked by At

Does Spark shuffle write all intermediate data to disk, or only that which will not fit in memory ("spill")?

In particular, if the intermediate data is small, will anything be written to disk, or will the shuffle be performed entirely using memory without writing anything to disk?

I've checked the docs and related StackOverflow questions, but they weren't clear on this precise question.

3

There are 3 best solutions below

0
On

From an AWS guide, https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/#:~:text=In%20Apache%20Spark%2C%20shuffling%20happens,which%20can%20cause%20straggling%20executors, but also to be found elsewhere.

Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel.

In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors.

That is to say, the architecture of Spark is to write Mapper output to local disk, for Reducer phase, tasks to consume. Size of data does not matter. I agree certain aspects are not clear.

0
On

Yes, it does. Even though Apache Spark is considered to be "pure memory computation", it does write every bit of intermediate data to disk. The reasons are saving memory resources and disaster recovery.

Here is the simplified writing logic of Spark 3.0.1, writing each of k-v pair to a stream named "objOut"

class ShufflePartitionPairsWriter {

  override def write(key: Any, value: Any): Unit = {
    if (isClosed) {
      throw new IOException("Partition pairs writer is already closed.")
    }
    if (objOut == null) {
      open()
    }
    objOut.writeKey(key)
    objOut.writeValue(value)
    recordWritten()
  }

}

Here is how stream objOut initialized

  private def open(): Unit = {
    try {
      partitionStream = partitionWriter.openStream
      timeTrackingStream = new TimeTrackingOutputStream(writeMetrics, partitionStream)
      wrappedStream = serializerManager.wrapStream(blockId, timeTrackingStream)
      objOut = serializerInstance.serializeStream(wrappedStream)
    } catch {
      case e: Exception =>
        Utils.tryLogNonFatalError {
          close()
        }
        throw e
    }
  }

Here is the source of stream objOut

class FileOutputStream extends OutputStream
{
    public FileOutputStream(File file, boolean append)
        throws FileNotFoundException
    {
        String name = (file != null ? file.getPath() : null);
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkWrite(name);
        }
        if (name == null) {
            throw new NullPointerException();
        }
        if (file.isInvalid()) {
            throw new FileNotFoundException("Invalid file path");
        }
        this.fd = new FileDescriptor();
        fd.attach(this);
        this.append = append;
        this.path = name;

        open(name, append);
    }
}
0
On

Answer to question in single line yes but Memory management spark 3.0 is better . unified memory management

MAP PHASE

  • During the map phase, each executor writes its output data for a given shuffle partition to local disk storage instead of sending it directly to the reducer.
  • The intermediate data is written as individual spill files, typically in a round-robin manner across multiple local disks to distribute the I/O load.
  • If the data for a single shuffle partition exceeds the executor's memory limit, it will be spilled to disk in multiple spill files.

Reduce Phase:

  • The reduce tasks fetch the spilled data partitions from the map tasks' local disks, bringing them into memory for processing.

  • The reduce tasks operate on the merged data, performing the necessary computations