What is shufflequerystage in spark DAG?

1.5k Views Asked by At

What is the shufflequerystage box that I see in the spark DAGs. How is it different from the excahnge box in the spark stages?

enter image description here

2

There are 2 best solutions below

0
On BEST ANSWER

shufflequerystage are connected to AQE, they are being added after each stage with exchange and are used to materialized results after each stage and optimize remaining plan based on statistics.

So imo short answer is:

Exchange - here your data are shuffled

Shufflequerystage - added for AQE purposes to use runtime statistics and reoptimize plan

In below example i am trying to show this mechanism

Here is sample code:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", true)

val input = spark.read
  .format("csv")
  .option("header", "true")
  .load(
    "dbfs:/FileStore/shared_uploads/**@gmail.com/city_temperature.csv"
  )
val dataForInput2 = Seq(
  ("Algeria", "3"),
  ("Germany", "3"),
  ("France", "5"),
  ("Poland", "7"),
  ("test55", "86")
)
val input2 = dataForInput2
  .toDF("Country", "Value")
  .withColumn("test", lit("test"))
val joinedDfs = input.join(input2, Seq("Country"))
val finalResult =
  joinedDfs.filter(input("Country") === "Poland").repartition(200)
finalResult.show 

I am reading data from file but you can replace it with small df created in code because i added line to disable broadcast. I added some withColumn and repartition to make it more interesting

First lets take a look at plan with AQE disabled:

== Physical Plan ==
CollectLimit (11)
+- Exchange (10)
   +- * Project (9)
      +- * SortMergeJoin Inner (8)
         :- Sort (4)
         :  +- Exchange (3)
         :     +- * Filter (2)
         :        +- Scan csv  (1)
         +- Sort (7)
            +- Exchange (6)
               +- LocalTableScan (5)

Now AQE enabled

== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
   CollectLimit (16)
   +- ShuffleQueryStage (15), Statistics(sizeInBytes=1447.8 KiB, rowCount=9.27E+3, isRuntime=true)
      +- Exchange (14)
         +- * Project (13)
            +- * SortMergeJoin Inner (12)
               :- Sort (6)
               :  +- AQEShuffleRead (5)
               :     +- ShuffleQueryStage (4), Statistics(sizeInBytes=1158.3 KiB, rowCount=9.27E+3, isRuntime=true)
               :        +- Exchange (3)
               :           +- * Filter (2)
               :              +- Scan csv  (1)
               +- Sort (11)
                  +- AQEShuffleRead (10)
                     +- ShuffleQueryStage (9), Statistics(sizeInBytes=56.0 B, rowCount=1, isRuntime=true)
                        +- Exchange (8)
                           +- LocalTableScan (7)

The code is the same, the only difference is AQE but now you can see that ShuffleQueryStage popped up after each exchange

Lets take a look at Dag visualisation as in your example.

First lets take a look at job3 which included join

enter image description here

Then there is job4 which just reuse what was computed previously but adds additional 4th stage with ShuffleQueryStage similar as in your case

enter image description here

2
On

There is already a nice answer here, but this is just to give you some more info on what this shufflequerystage actually is by looking at the source code.

What is a Shuffle Query Stage?

If we look at Spark's source code for the ShuffleQueryStageExec case class, we see the following:

case class ShuffleQueryStageExec(
    override val id: Int,
    override val plan: SparkPlan,
    override val _canonicalized: SparkPlan) extends QueryStageExec {
...
}

So ShuffleQueryStageExec extends QueryStageExec. Let's have a look at QueryStageExec then. The code comments are enlightening:

A query stage is an independent subgraph of the query plan. Query stage materializes its output before proceeding with further operators of the query plan. The data statistics of the materialized output can be used to optimize subsequent query stages.

There are 2 kinds of query stages:

  1. Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches another job to execute the further operators.
  2. Broadcast query stage. This stage materializes its output to an array in driver JVM. Spark broadcasts the array before executing the further operators.

So in (very) short, a ShuffleQueryStage is a part of your total query plan whose data statistics can be used to optimize subsequent query stages. This is all part of Adaptive Query Execution (AQE).

How is such a Shuffle Query Stage made?

To get a better feeling of how this all works, we can try to understand how the shuffle query stage is made. The AdaptiveSparkPlanExec case class is the interesting location for this.

There are a bunch of actions (collect, take, tail, execute, ...) that trigger the withFinalPlanUpdate function, which in turn triggers the getFinalPhysicalPlan function. In this function, the createQueryStages function gets called and this is where it gets interesting.

The createQueryStages function is a recursive function that travels through the whole plan tree and it looks a bit like this:

  private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match {
    case e: Exchange =>
      // First have a quick check in the `stageCache` without having to traverse down the node.
      context.stageCache.get(e.canonicalized) match {
        case Some(existingStage) if conf.exchangeReuseEnabled =>
          ...

        case _ =>
          val result = createQueryStages(e.child)
          val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange]
          // Create a query stage only when all the child query stages are ready.
          if (result.allChildStagesMaterialized) {
            var newStage = newQueryStage(newPlan)
            ...
      }

So you see, if we bounce onto an Exchange that already was executed and we want to reuse it, we just do that. But if that is not the case, we will create a new plan and call the newQueryStage function.

This is where the story ends. The newQueryStage function looks like this:

  private def newQueryStage(e: Exchange): QueryStageExec = {
    val optimizedPlan = optimizeQueryStage(e.child, isFinalStage = false)
    val queryStage = e match {
      case s: ShuffleExchangeLike =>
        ...
        ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized)
      case b: BroadcastExchangeLike =>
        ...
        BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized)
    }
    ...
  }

So there we see the ShuffleQueryStageExec being made! So for each Exchange, that has not happened yet or if you're not reusing exchanges, AQE will either add a ShuffleQueryStageExec or a BroadcastQueryStageExec.

Hope this brings more insight to what this is :)