I've looked into my job and have identified that I do indeed have a skewed task. How do I determine what the actual value is inside this task that is causing the skew?
My Python Transforms code looks like this:
from transforms.api import Input, Output, transform
@transform(
...
)
def my_compute_function(...):
...
df = df.join(df_2, ["joint_col"])
...
Theory
Skew problems originate from anything that causes an exchange in your job. Things that cause exchanges include but are not limited to:
joins,windows,groupBys.These operations result in data movement across your Executors based upon the found values inside the DataFrames used. This means that when a used DataFrame has many repeated values on the column dictating the exchange, those rows all end up in the same task, thus increasing its size.
Example
Let's consider the following example distribution of data for your join:
These DataFrames when joined together on
col_1will have the following data distributed across the executors:key_1from df1key_1from df2key_2from df1key_2from df2key_3from df2If you therefore look at the counts of input and output rows per task, you'll see that Task 1 has far more data than the others. This task is skewed.
Identification
The question now becomes how we identify that
key_1is the culprit of the skew since this isn't visible in Spark (the underlying engine powering the join).If we look at the above example, we see that all we need to know is the actual counts per key of the joint column. This means we can:
The easiest way to do this is by opening the Analysis (Contour) tool in Foundry and performing the following analysis:
Add
df1as input to a first pathAdd
Pivot Tableboard, usingcol_1as the rows, andRow countas the aggregateClick the
⇄ Switch to pivoted databuttonUse the
Multi-Column Editorboard to keep onlycol_1and theCOUNTcolumn. Prefix each of them withdf1_, resulting in an output from the path which is onlydf1_col_1anddf1_COUNT.Add
df2as input to a second pathAdd
Pivot Tableboard, again usingcol_1as the rows, andRow countas the aggregateClick the
⇄ Switch to pivoted databuttonUse the
Multi-Column Editorboard to keep onlycol_1and theCOUNTcolumn. Prefix each of them withdf2_, resulting in an output from the path which is onlydf2_col_1anddf2_COUNT.Create a third path, using the result of the first path (
df1_col_1anddf1_COUNT1)Add a
Joinboard, making the right side of the join the result of the second path (df2_col_1anddf2_col_1). Ensure the join type isFull joinAdd all columns from the right side (you don't need to add a prefix, all the columns are unique
Configure the join board to join on
df1_col_1equalsdf2_col_1Add an
Expressionboard to create a new column,output_row_countwhich multiplies the twoCOUNTcolumns togetherAdd a
Sortboard that sorts onoutput_row_countdescendingIf you now preview the resultant data, you will have a sorted list of keys from both sides of the join that are causing the skew