Understanding Query Plan of a Spark SQL Query

538 Views Asked by At

I am trying to understand the Physical Plan of a Spark SQL query. I am using Spark SQL v 2.4.7. Below is a partial query plan generated for a big query.

:     +- ReusedQueryStage 16
:        +- BroadcastQueryStage 7
:           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
:              +- AdaptiveSparkPlan(isFinalPlan=true)
:                 +- *(11) HashAggregate(keys=[src_clmorigid#21055], functions=[], output=[src_clmorigid#21055])
:                    +- ShuffleQueryStage 21, true
:                       +- Exchange hashpartitioning(src_clmorigid#21055, 10)
:                          +- *(10) HashAggregate(keys=[src_clmorigid#21055], functions=[], output=[src_clmorigid#21055])
:                             +- *(10) Project [src_clmorigid#21055]
:                                +- *(10) BroadcastHashJoin [tgt_clmorigid#21152], [tgt_clmorigid#20756], Inner, BuildRight
:                                   :- *(10) Project [src_clmorigid#21055, tgt_clmorigid#21152]
:                                   :  +- *(10) Filter (isnotnull(tgt_clmorigid#21152) && isnotnull(src_clmorigid#21055))
:                                   :     +- *(10) FileScan parquet default.vw_exclude_latest_set_frm_clm[src_clmorigid#21055,tgt_clmorigid#21152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://dm_bucket...
:                                   +- ReusedQueryStage 20
:                                      +- BroadcastQueryStage 6
:                                         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
:                                            +- AdaptiveSparkPlan(isFinalPlan=true)
:                                               +- *(9) HashAggregate(keys=[tgt_clmorigid#20756], functions=[], output=[tgt_clmorigid#20756])
:                                                  +- ShuffleQueryStage 19, true
:                                                     +- Exchange hashpartitioning(tgt_clmorigid#20756, 10)
:                                                        +- *(8) HashAggregate(keys=[tgt_clmorigid#20756], functions=[], output=[tgt_clmorigid#20756])
:                                                           +- *(8) Project [tgt_clmorigid#20756]
:                                                              +- *(8) Filter ((((isnotnull(tgt_clm_line_type_ind#20783) && isnotnull(src_clm_line_type_ind#20686)) 
:                                                                 +- *(8) FileScan parquet default.vw_exclude_latest_set_frm_clm[src_clm_line_type_ind#20686,tgt_clmorigid#20756,tgt_clm_line_type_ind#20783] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://...PushedFilters: [IsNotNull(tgt_clm_line_type_ind), 
                                                                      +- *(41) Project [vw_clm_base_fact_sk#21807, source_system#21808, eff_date#21809, frst_sales_crtn_dt#21810, clmorigid#21811, ... 59 more fields]
                                                                          +- *(41) FileScan parquet default.vw_to_be_merged_data[vw_clm_base_fact_sk#21807,source_system#21808,eff_date#21809,frst_sales_crtn_dt#21810,... 56 more fields], ...

Can anyone please help me answer the following questions I have:

  • What are the numbers inside parenthesis signify ? E.g. *(41), *(8) etc. Does it represent Stage Id or WholeStageCodeGen Id ?

  • I understand that the asterisk '*' represent WholeStageCodegen. What does that exactly mean and what is the significance in terms of query performance ? Few days back I saw a Spark SQL query Physical Plan that did not contain any asterisks - which means no wholestagecodegen. Does that mean the sql query was poorly written and hence performance of that query will be suboptimal ? What causes wholestagecodegen to be not utilized by Spark optimizer as in that specific query ?

  • What does "ReusedQueryStage 20" mean in the above query plan ? What does the number 20 signify ?

  • What does "BroadcastQueryStage 6" mean in the above query plan ? What does the number 6 signify ?

  • What does "ShuffleQueryStage 21" mean in the above query plan ? What does the number 21 signify ?

  • What does "AdaptiveSparkPlan(isFinalPlan=true)" mean in the above query plan ?

  • I once saw an execution plan of a query in the SQL tab of the Spark UI which had an operator called "BloomFilter". What does that mean ? Is it something regarding reading a parquet file ? Can you please explain.

A question regarding Spark UI:

  • In the Stages tab of Spark UI (spark SQL 2.4.7), the DAGs often contain boxes labelled "WholeStageCodeGen" which contains several operators in the same box. What does this signify in Spark with respect to query performance ?

  • The DAGs shown in the Stages tab of Spark UI do not show which part of the actual query it pertains to, since it does not show any specific table-name etc. Hence with big queries it often becomes very difficult to pinpoint the exact part of the query that the DAG pertains to. Is there any way to identify the exact part of the code which pertains to that specific Stage in the DAG ?

Note: I am using Spark 2.4.7

Can anyone please help me answer the above questions. Any help is appreciated. Thanks.

0

There are 0 best solutions below