Use interleave to speed up hash repartitioning#15768
Use interleave to speed up hash repartitioning#15768Dandandan wants to merge 11 commits intoapache:mainfrom
interleave to speed up hash repartitioning#15768Conversation
| ) | ||
| .unwrap(); | ||
|
|
||
| let batch = interleave_record_batch(&b, &indices)?; |
There was a problem hiding this comment.
Probably an api like apache/arrow-rs#7325 would be even faster (avoiding one level of "trivial" indexing).
There was a problem hiding this comment.
Nice change, and a much clearer performance win than #15479. I expect (without testing) that these two PRs interact negatively with one another - Removing coalesce will mean that the data is "more scattered" in memory and probably make interleave work worse - as well as the computation of the left join keys.
There was a problem hiding this comment.
I think removing coalesce after this change (for all hash repartitions) might be possible, as the output batch size will be roughly equal to input batch size (instead of roughly 1/partitions * batch_size). Unless hash values are somehow skewed (but this is currently also not good anyway).
A future api could use your take_in api maybe to only output rows once batch size has been reached.
There was a problem hiding this comment.
I see I had misunderstood this PR. It makes a lot of sense to do this. As part of prototyping the integration of a take_in API in datafusion, I made a similar change - move the buffering before sending the small batches to their destination thread. I don't remember seeing as much speedup when I benchmarked that change independently - I guess using interleave instead of a take/concat combo (like I did back then) makes a significant difference. Awesome!
There was a problem hiding this comment.
Yeah the speedup comes from avoiding copying the data a second time in concat / CoalesceBatches. So when using take_in we should be careful to use it once (for a single destination batch) to avoid doing the concat on the small batches
interleave in hash repartitioninginterleave in hash repartitioning
There was a problem hiding this comment.
Pull Request Overview
This PR improves DataFusion’s hash repartitioning by using arrow's interleave_record_batch, refactors the partitioning interface to work with multiple batches, and adds a buffering mechanism in the repartition executor.
- Changes the partition and partition_iter functions to accept a Vec instead of a single batch.
- Replaces the use of take_arrays with interleave_record_batch to build repartitioned batches efficiently.
- Updates the RepartitionExec logic to buffer input batches based on the partitioning mode.
Comments suppressed due to low confidence (1)
datafusion/physical-plan/src/repartition/mod.rs:316
- [nitpick] The variable name 'b' is not descriptive. Consider renaming it to something like 'batch_refs' or 'batches_slice' to improve code readability.
let b: Vec<&RecordBatch> = batches.iter().collect();
|
Does the datafusion/datafusion/physical-plan/src/repartition/mod.rs Lines 921 to 926 in 91870b6 |
|
FYI @alamb ould be nice to have some benchmarks to be run. This improves on some long standing issue in DataFusion. |
interleave in hash repartitioninginterleave to speed up hash repartitioning
|
@alamb could you run the benchmarks maybe? |
|
Hmmm... I think the speedup actually doesn't come from using |
|
closing for now |
Which issue does this PR close?
Addresses #7957, #6822, #7001
Rationale for this change
Saves work in
coalescebatches(less concat)Details
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?