What is the shufflequerystage
box that I see in the spark DAGs. How is it different from the excahnge
box in the spark stages?
What is shufflequerystage in spark DAG?
1.5k Views Asked by figs_and_nuts AtThere are 2 best solutions below

There is already a nice answer here, but this is just to give you some more info on what this shufflequerystage
actually is by looking at the source code.
What is a Shuffle Query Stage?
If we look at Spark's source code for the ShuffleQueryStageExec case class, we see the following:
case class ShuffleQueryStageExec(
override val id: Int,
override val plan: SparkPlan,
override val _canonicalized: SparkPlan) extends QueryStageExec {
...
}
So ShuffleQueryStageExec extends QueryStageExec
. Let's have a look at QueryStageExec then. The code comments are enlightening:
A query stage is an independent subgraph of the query plan. Query stage materializes its output before proceeding with further operators of the query plan. The data statistics of the materialized output can be used to optimize subsequent query stages.
There are 2 kinds of query stages:
- Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches another job to execute the further operators.
- Broadcast query stage. This stage materializes its output to an array in driver JVM. Spark broadcasts the array before executing the further operators.
So in (very) short, a ShuffleQueryStage
is a part of your total query plan whose data statistics can be used to optimize subsequent query stages. This is all part of Adaptive Query Execution (AQE).
How is such a Shuffle Query Stage made?
To get a better feeling of how this all works, we can try to understand how the shuffle query stage is made. The AdaptiveSparkPlanExec case class is the interesting location for this.
There are a bunch of actions (collect, take, tail, execute, ...) that trigger the withFinalPlanUpdate
function, which in turn triggers the getFinalPhysicalPlan
function. In this function, the createQueryStages function gets called and this is where it gets interesting.
The createQueryStages function is a recursive function that travels through the whole plan tree and it looks a bit like this:
private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match {
case e: Exchange =>
// First have a quick check in the `stageCache` without having to traverse down the node.
context.stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
...
case _ =>
val result = createQueryStages(e.child)
val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange]
// Create a query stage only when all the child query stages are ready.
if (result.allChildStagesMaterialized) {
var newStage = newQueryStage(newPlan)
...
}
So you see, if we bounce onto an Exchange
that already was executed and we want to reuse it, we just do that. But if that is not the case, we will create a new plan and call the newQueryStage
function.
This is where the story ends. The newQueryStage
function looks like this:
private def newQueryStage(e: Exchange): QueryStageExec = {
val optimizedPlan = optimizeQueryStage(e.child, isFinalStage = false)
val queryStage = e match {
case s: ShuffleExchangeLike =>
...
ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized)
case b: BroadcastExchangeLike =>
...
BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized)
}
...
}
So there we see the ShuffleQueryStageExec
being made! So for each Exchange
, that has not happened yet or if you're not reusing exchanges, AQE will either add a ShuffleQueryStageExec
or a BroadcastQueryStageExec
.
Hope this brings more insight to what this is :)
shufflequerystage are connected to AQE, they are being added after each stage with exchange and are used to materialized results after each stage and optimize remaining plan based on statistics.
So imo short answer is:
Exchange - here your data are shuffled
Shufflequerystage - added for AQE purposes to use runtime statistics and reoptimize plan
In below example i am trying to show this mechanism
Here is sample code:
I am reading data from file but you can replace it with small df created in code because i added line to disable broadcast. I added some withColumn and repartition to make it more interesting
First lets take a look at plan with AQE disabled:
Now AQE enabled
The code is the same, the only difference is AQE but now you can see that ShuffleQueryStage popped up after each exchange
Lets take a look at Dag visualisation as in your example.
First lets take a look at job3 which included join
Then there is job4 which just reuse what was computed previously but adds additional 4th stage with ShuffleQueryStage similar as in your case