Spark > 3.0
DBR 13.2
I have a handful of large delta tables, all joined on the same unique integer id.
Loan
| Unified_ID | Loan_Number | Expiration_Date |
|---|---|---|
| 1 | ABC | 2023-01-01 |
| 2 | CDE | 2023-01-02 |
| 3 | EFG | 2023-01-03 |
| … | ||
| 80000000 | ZZZ | 2023-01-04 |
Collection
| Unified_ID | Collection_Attempt | Collection_Date |
|---|---|---|
| 1 | ABC | 2023-01-01 |
| 80000000 | ZZZ | 2023-01-04 |
Having trouble with long run times, appears to be bottlenecked by some egregious shuffle and sort steps. For the purposes of this question, please ignore the model design aspect.
Can shuffle and sort steps be avoided with careful choice of partition and initial write order? For example, say I partition on by each chunk of 1 million in the sequential id.
So, for all tables I something like:
| Partition Number | id_lower_bound | id_lower_bound |
|---|---|---|
| 1 | 1 | 999999 |
| 2 | 1000000 | 1999999 |
| … | ||
| 7000 | 7000000000 | 7000999999 |
If I sort the data on write initially by unified_id, zorder by unified_id. Can I make sure that when each table is read
| Executor | Contents |
|---|---|
| 1 | Loan: {1,2,3,…} Collections: {1,2,3,…} |
| 2 | Loan: {5000,5001,...} Collections: {5000,5001,...} |
| … | |
| 8 | Loan: {12000,12001} Collections: {12000,12001} |
That way when a query like
select *
from loan
inner
join collections
on loan.unified_id = collections.unified_id
All rows that would be satisfied by that condition are already on the same executor, and no need to do an additional sort?
edit: I have of course found this answer - How to control preferred locations of RDD partitions? . But the concern is that reading in as delta, then converting to an rdd to coerce the partition location, then converting again would defeat the purpose of the overhead reduction I'm aiming at.