Spark reads all columns in filtering when using scala syntax

100 Views Asked by At

This code is good, it only reads the column i (notice the last line ReadSchema: struct<i:bigint>, which only reads i):

import org.apache.spark.sql.Dataset

// Define the case class
case class Foo(i: Long, j: String)

// Create a Dataset of Foo
val ds: Dataset[Foo] = spark.createDataset(Seq(
  Foo(1, "Q"),
  Foo(10, "W"),
  Foo(100, "E")
))

// Filter and cast the column
val result = ds.filter($"i" === 2).select($"i")

// Explain the query plan
result.explain()

// It prints:
//== Physical Plan ==
//*(1) Filter (isnotnull(i#225L) AND (i#225L = 2))
//+- *(1) ColumnarToRow
//   +- FileScan parquet [i#225L] Batched: true, DataFilters: [isnotnull(i#225L), (i#225L = 2)], //Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], //PushedFilters: [IsNotNull(i), EqualTo(i,2)], ReadSchema: struct<i:bigint>

However, if i use val result = ds.filter(_.i == 10).map(_.i), the physical plan will read all columns including j (notice the last line ReadSchema: struct<i:bigint,j:string>):

//= Physical Plan ==
//*(1) SerializeFromObject [input[0, bigint, false] AS value#336L]
//+- *(1) MapElements //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8811/2079//839768@1028cff, obj#335: bigint
//   +- *(1) Filter //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8810/7014//[email protected]
//      +- *(1) DeserializeToObject newInstance(class //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo), obj#334: //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo
//         +- *(1) ColumnarToRow
//            +- FileScan parquet [i#225L,j#226] Batched: true, DataFilters: [], Format: Parquet, //Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], PushedFilters: [], //ReadSchema: struct<i:bigint,j:string>

Why does spark handle differently when i use the scala syntax _.i inside filter?

2

There are 2 best solutions below

5
On BEST ANSWER

_ forces collection, tuple processing like rdd due to scala map. You see ColumnarToRow & DeserilaizeToObject in physical plan.

Catalyst has little insight into Scala code and this is an old aspect if you google. ds.map(_.x) is same as col("x") but Spark Catalyst still does nothing in this regard.

See https://issues.apache.org/jira/browse/SPARK-14083. Note that they are aware of this but it has not been catered for.

7
On

Dataframe API (using column references) and Dataset API are different not only in the way to describe the plan but also in the way that is managed.

Dataset transformations are created with lambdas and are like UDFs to the spark optimizer because it can't see what's happening underneath. If you really want Spark to optimize your query it's recommended to use only dataframe API.