Difference Beetween Window function and OrderBy in Spark

1.2k Views Asked by At

I have code that his goal is to take the 10M oldest records out of 1.5B records.

I tried to do it with orderBy and it never finished and then I tried to do it with a window function and it finished after 15min.

I understood that with orderBy every executor takes part of the data, order it and pass the top 10M to the final executor. Because 10M>partition size we are passing to the final executor all the data and then it's taking a lot of time to finish.

I couldn't understand how the window solution works? What's happening in the shuffle before the single executor starts to run? How this shuffle helps to the sorting in the single executor to work faster? I would appreciate any help with understanding how window function in this case works in the background.

This is the code of the window function:

df = sess.sql("select * from table")
last_update_window = Window.orderBy(f.col("last_update").asc())
df = df.withColumn('row_number', f.row_number().over(last_update_window))
df = df.where(f.col('row_number') <= 1000000000)

This is the code for orderBy:

df = sess.sql("select * from table")
df = df.orderBy(f.col('last_update').asc()).limit(100000000)

Here is a picture of the plan when executing the window function:

enter image description here

1

There are 1 best solutions below

21
On

Run Explain on both queries, and that will show you the different paths they take. T

Window sends all the data to 1 node. In this case you also use a where clause that means it uses a shuffle to complete the filtering. This seems to be faster than the implementation used by limit for a large number of items. It's likely faster because of the volume of data. Extra shuffles hurt for small data sets but if optimized correctly on large data sets helps spread the load and reduce the time it takes.

    == Physical Plan ==
*(2) Filter (isnotnull(row_number#66) && (row_number#66 <= 1000000000))
+- Window [row_number() windowspecdefinition(date#61 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#66], [date#61 ASC NULLS FIRST]
   +- *(1) Sort [date#61 ASC NULLS FIRST], false, 0
      +- Exchange SinglePartition
         +- Scan hive default.table_persons [people#59, type#60, date#61], HiveTableRelation `default`.`table_persons`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [people#59, type#60, date#61]

"Order by" uses a different implementation and does not appear to perform as well under with 'large takes'. It doesn't use as many shuffles but this seems that it doesn't accomplish the work as fast when there is a large number of items to return.

== Physical Plan ==
TakeOrderedAndProject(limit=100000000, orderBy=[date#61 ASC NULLS FIRST], output=[people#59,type#60,date#61])
+- Scan hive default.table_persons [people#59, type#60, date#61], HiveTableRelation `default`.`table_persons`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [people#59, type#60, date#61]

As an aside, for large skewed data sets it common to add 2 extra shuffles and this does in fact take less time than not having the extra shuffles. (But again would increase the time it takes on small data sets.)

So what does TakeOrderedAndProject actually do? It uses a take and sorts the data on disk for large datasets. (Instead of sorting in memory).

With your window it does do a shuffle which does use ranges to sort the data. There are inferences that further sorting is done in memory giving you the performance tradeoff. (Updated links to inferences below.)

And this is where I think you are getting the payoff. (I'd be interested to know if adding a limit to your existing window speed things up.)

From snippets on reddit and digging issues the code it the are inference that the sort is done in memory and spilt to disk if it's required.

The other piece that I feel provide a lot of performance boost is that you use a where clause. Take pulls back as many items from each partition as you have in the limit clause. (see the implementation above.) This is done only to throw out the items. where doesn't not pull any items back that do not match the filter condition. This movement of data [using limit] is likely where you are getting the real performance degradation in limit.