I am trying to understand the Physical Plan of a Spark SQL query. I am using Spark SQL v 2.4.7. Below is a partial query plan generated for a big query.
: +- ReusedQueryStage 16
: +- BroadcastQueryStage 7
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- AdaptiveSparkPlan(isFinalPlan=true)
: +- *(11) HashAggregate(keys=[src_clmorigid#21055], functions=[], output=[src_clmorigid#21055])
: +- ShuffleQueryStage 21, true
: +- Exchange hashpartitioning(src_clmorigid#21055, 10)
: +- *(10) HashAggregate(keys=[src_clmorigid#21055], functions=[], output=[src_clmorigid#21055])
: +- *(10) Project [src_clmorigid#21055]
: +- *(10) BroadcastHashJoin [tgt_clmorigid#21152], [tgt_clmorigid#20756], Inner, BuildRight
: :- *(10) Project [src_clmorigid#21055, tgt_clmorigid#21152]
: : +- *(10) Filter (isnotnull(tgt_clmorigid#21152) && isnotnull(src_clmorigid#21055))
: : +- *(10) FileScan parquet default.vw_exclude_latest_set_frm_clm[src_clmorigid#21055,tgt_clmorigid#21152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://dm_bucket...
: +- ReusedQueryStage 20
: +- BroadcastQueryStage 6
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- AdaptiveSparkPlan(isFinalPlan=true)
: +- *(9) HashAggregate(keys=[tgt_clmorigid#20756], functions=[], output=[tgt_clmorigid#20756])
: +- ShuffleQueryStage 19, true
: +- Exchange hashpartitioning(tgt_clmorigid#20756, 10)
: +- *(8) HashAggregate(keys=[tgt_clmorigid#20756], functions=[], output=[tgt_clmorigid#20756])
: +- *(8) Project [tgt_clmorigid#20756]
: +- *(8) Filter ((((isnotnull(tgt_clm_line_type_ind#20783) && isnotnull(src_clm_line_type_ind#20686))
: +- *(8) FileScan parquet default.vw_exclude_latest_set_frm_clm[src_clm_line_type_ind#20686,tgt_clmorigid#20756,tgt_clm_line_type_ind#20783] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://...PushedFilters: [IsNotNull(tgt_clm_line_type_ind),
+- *(41) Project [vw_clm_base_fact_sk#21807, source_system#21808, eff_date#21809, frst_sales_crtn_dt#21810, clmorigid#21811, ... 59 more fields]
+- *(41) FileScan parquet default.vw_to_be_merged_data[vw_clm_base_fact_sk#21807,source_system#21808,eff_date#21809,frst_sales_crtn_dt#21810,... 56 more fields], ...
Can anyone please help me answer the following questions I have:
What are the numbers inside parenthesis signify ? E.g.
*(41)
,*(8)
etc. Does it representStage Id
orWholeStageCodeGen Id
?I understand that the asterisk '
*
' representWholeStageCodegen
. What does that exactly mean and what is the significance in terms of query performance ? Few days back I saw a Spark SQL query Physical Plan that did not contain any asterisks - which means nowholestagecodegen
. Does that mean the sql query was poorly written and hence performance of that query will be suboptimal ? What causeswholestagecodegen
to be not utilized by Spark optimizer as in that specific query ?What does "
ReusedQueryStage 20
" mean in the above query plan ? What does the number20
signify ?What does "
BroadcastQueryStage 6
" mean in the above query plan ? What does the number6
signify ?What does "
ShuffleQueryStage 21
" mean in the above query plan ? What does the number21
signify ?What does "
AdaptiveSparkPlan(isFinalPlan=true)
" mean in the above query plan ?I once saw an execution plan of a query in the SQL tab of the Spark UI which had an operator called "
BloomFilter
". What does that mean ? Is it something regarding reading a parquet file ? Can you please explain.
A question regarding Spark UI:
In the
Stages
tab of Spark UI (spark SQL 2.4.7), the DAGs often contain boxes labelled "WholeStageCodeGen
" which contains several operators in the same box. What does this signify in Spark with respect to query performance ?The DAGs shown in the Stages tab of Spark UI do not show which part of the actual query it pertains to, since it does not show any specific table-name etc. Hence with big queries it often becomes very difficult to pinpoint the exact part of the query that the DAG pertains to. Is there any way to identify the exact part of the code which pertains to that specific Stage in the DAG ?
Note: I am using Spark 2.4.7
Can anyone please help me answer the above questions. Any help is appreciated. Thanks.