I've read this question in which the OP tried to convert this logical plan:
Aggregate [sum(inc(vals#4L)) AS sum(inc(vals))#7L]
+- LocalRelation [vals#4L]
To this:
Aggregate [sum(inc_val#6L) AS sum(inc(vals))#7L]
+- Project [inc(vals#4L) AS inc_val#6L]
+- LocalRelation [vals#4L]
I have a couple of questions:
- Why did he want to have the Project operator? What is the advantage of it?
- As far as I know, Project is the operator that represents the SELECT statement, so how could possibly a plan not include a Project operator?
Premise: Aggregation involves two-steps:
Calculation of partial aggregate
followed by an shuffle and global aggregate.
Basically Aggregate is a two-stage operation.
Why did he want to have the Project operator? What is the advantage of it?
I can think of two scenarios:
Codegen enabled: With code-gen enabled, the code for the entire stage(including that of UDF) is run within a single iterator. This can potentially result in memory issues when the UDF is CPU intensive/memory intensive etc . Note that aggregation is happening within the same iterator.
Codegen disabled: With code-gen disabled, each operator will run in its own iterator. The possibility of OOM/CPU getting high reduces as we have decoupled aggregation from UDF run.
As far as I know, Project is the operator that represents the SELECT statement, so how could possibly a plan not include a Project operator?
As long as the last operator returns the required columns, we don't need an explicit project. For example, say top-most operator(say Filter) is returning 3 columns. If the requirement is that we require 3 columns as output, No explicit project is required. If we require only 2 out of 3, we need an explicit project. An optimizer rule does this as part of query planning. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L440