-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: Allow cancelling of grouping operations which are CPU bound #16196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…io MPSC (RecordBatchReceiverStream)
The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance? |
Thanks @zhuqi-lucas -- I'll try running the cancellation benchmark from @carols10cents |
🤖 |
1 similar comment
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb for review and benchmark. I am wandering if it will hit datafusion itself running performance, because we add (if logic) in the aggregate and other core exec. If we only want to support datafusion-cli canceling logic, maybe we can add the wapper logic to datafusion-cli. But from other related issue, it seems some cusomers use grpc to drop stream not only limited to datafusion-cli. May be the perfect solution is:
|
I polish the code only affect the no grouping aggregate, maybe we can compare the clickbench, so we can be confident to merge if it not affect aggregate performance. |
Updated the performance for current PR: SET datafusion.execution.target_partitions = 1;
SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value) |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.315 seconds. The main branch: SET datafusion.execution.target_partitions = 1;
SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value) |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.567 seconds. No performance regression from the above testing. |
@@ -77,6 +77,11 @@ impl AggregateStream { | |||
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); | |||
let input = agg.input.execute(partition, Arc::clone(&context))?; | |||
|
|||
// Only wrap no‐grouping aggregates in our YieldStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my own testing with partition_count = 1
group by aggregates suffer from the same problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, i can reproduce it now:
SELECT
(value % 10) AS group_key,
COUNT(*) AS cnt,
SUM(value) AS sum_val
FROM range(1, 5000000000) AS t
GROUP BY (value % 10)
ORDER BY group_key;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve , i also added the grouping support in latest PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From testing result, it seems the grouping by cases have some performance regression.
Another solution is using CoalescePartitionsExec to wrapper: diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
0 => internal_err!(
"CoalescePartitionsExec requires at least one input partition"
),
- 1 => {
- // bypass any threading / metrics if there is a single partition
- self.input.execute(0, context)
- }
+ // 1 => {
+ // // bypass any threading / metrics if there is a single partition
+ // self.input.execute(0, context)
+ // }
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
- 1 => plan.execute(0, context),
- 2.. => {
+ 1.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 |
Hi @alamb , i believe we also can do the clickbench benchmark for this PR. But i am not confident about the result since it seems we will always add some overhead to aggregate. Thanks! |
🤖 |
🤖: Benchmark completed Details
|
This comment was marked as outdated.
This comment was marked as outdated.
Running the benchmarks again to gather more details |
This comment was marked as outdated.
This comment was marked as outdated.
Thank you @alamb , it's surprising that performance has no regression, even faster for clickbench_partitioned, it may due to we yield for each partition running, and those make the partition running more efficient. |
Thank you @pepijnve , @ozankabak , i will try to polish the code and make all CI tasks green! |
My thinking was that we could use EmissionType to insert the yield wrapper closer to where it's needed rather than at the leaves. So if you have
rather than adding yield as parent of the leaves
you would add it as parent of the children of the node with emission type final
I'll admit that I didn't work out all the possible scenarios, but my reasoning is that if you have more than one pipeline breaking operator in a chain that this will work more reliably since you're 'fixing' each of them rather than injecting yield points at the leaves and hoping this propagates through the entire chain. |
It makes sense and a smart solution! I will try to address. |
I don't think this is a good idea. We can have many operators in a plan that break the pipeline, especially in plans with joins and windowing (cascades of windowing operators are quite common). Simply adding a Inserting it as a parent of leaf nodes, and only when necessary (first item in my message above), gives us a system where the least number of necessary |
I agree with this now. I actually tried using EmissionType to insert the YieldExec wrapper closer to where it's needed instead of just at the leaves. However, during SQL logic test runs with complex plans, it caused many errors, including stack overflows. I believe this approach introduces more edge cases and makes the behavior harder to reason about. |
Poll::Ready(Some(Ok(batch))) | ||
} | ||
} | ||
other => other, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the child stream returns Pending
, it's also ok to reset the batches_processed
counter to zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, thank you @pepijnve !
} | ||
} | ||
|
||
impl ExecutionPlan for YieldStreamExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should YieldStreamExec implement more of the ExecutionPlan trait? I'm not an expert on these things, but here's my assessment from going through the functions quickly:
maintains_input_order
can return truesupports_limit_pushdown
can, I think, return true.statistics
should probably just delegate to the childcardinality_effect
can returnEqual
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in latest PR, and also fix testing. Thanks
@@ -33,12 +33,12 @@ use std::borrow::Cow; | |||
use std::sync::Arc; | |||
use std::task::{Context, Poll}; | |||
|
|||
use super::AggregateExec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider reverting this change and the change in row_hash.rs to keep those files out of the commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, thank you @pepijnve !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried format command, it seems generate same result, so we can keep it here.
plan: Arc<dyn ExecutionPlan>, | ||
has_pipeline_breaking_above: bool, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
let children = plan.children(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth trying to use the TreeNode transformation facilities here? Something like below. It's just as much code, but makes use of the facilities provided by the crate.
fn wrap_leaves(
plan: Arc<dyn ExecutionPlan>,
) -> common::Result<Transformed<Arc<dyn ExecutionPlan>>> {
if plan.children().is_empty() {
// Leaf node: wrap it in `YieldStreamExec` and stop recursing
Ok(Transformed::new(Arc::new(YieldStreamExec::new(plan)), true, TreeNodeRecursion::Jump))
} else {
Ok(Transformed::no(plan))
}
}
fn wrap_leaves_of_pipeline_breakers(
plan: Arc<dyn ExecutionPlan>,
) -> common::Result<Transformed<Arc<dyn ExecutionPlan>>> {
let is_pipeline_breaker = plan.properties().emission_type == EmissionType::Final;
if is_pipeline_breaker {
let mut transformed = plan.transform_down(wrap_leaves)?;
transformed.tnr = TreeNodeRecursion::Jump;
Ok(transformed)
} else {
Ok(Transformed::no(plan))
}
}
impl PhysicalOptimizerRule for WrapLeaves {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, _: &ConfigOptions) -> common::Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(wrap_leaves_of_pipeline_breakers).data()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! I will try to address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in latest PR.
I think this might still be insufficient for something like this.
The filter + coalesce is there to ensure the What I think can happen here is that I did not verify this yet, but it might be worth adding a test case for this type of situation as well. |
Agreed, let's add a test for this. I don't think we'll have a problem, but maybe I'm wrong and it is easy to verify. |
Working on it and it seems to get cancelled indeed. I'll work on understanding why and report back. |
) -> Poll<Option<Self::Item>> { | ||
let this = &mut *self; | ||
|
||
if let Some(batch) = this.buffer.take() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can make this even simpler by moving this block
if this.batches_processed >= YIELD_BATCHES {
this.batches_processed = 0;
cx.waker().wake_by_ref();
Poll::Pending
}
all the way to the beginning of the function. That way there's no need for the buffer and you'll still get the desired effect of returning Pending
regularly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not adding the buffer, but If we remove the buffer, it will generate wrong result, here is the reproducer case:
SELECT SUM(value)
FROM range(1,500000000) AS t;
+--------------------+
| sum(t.value) |
+--------------------+
| 123047621195087232 |
+--------------------+
1 row(s) fetched.
Elapsed 0.248 seconds.
Right result from main branch:
SELECT SUM(value)
FROM range(1,500000000) AS t;
+--------------------+
| sum(t.value) |
+--------------------+
| 124999999750000000 |
+--------------------+
1 row(s) fetched.
Elapsed 0.319 seconds.
So i added the buffer logic. And testing cases will generate right answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it now, thank you @pepijnve , we can remove the buffer if we change to:
// Stream<Item = Result<RecordBatch>> to poll_next_unpin
impl Stream for YieldStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.batches_processed >= YIELD_BATCHES {
self.batches_processed = 0;
cx.waker().wake_by_ref();
return Poll::Pending;
}
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
self.batches_processed += 1;
Poll::Ready(Some(Ok(batch)))
}
Poll::Pending => {
self.batches_processed = 0;
Poll::Pending
}
other => other,
}
}
}
Addressed in latest PR.
@ozankabak I was able to get another non-exciting plan as follows. It's a contrived example, but it does demonstrate how While I was at it I added a child-of-pipeline-breaker wrapping test as well which is capable of cancelling this particular plan.
|
Hmm, cool example! I want to understand exactly what is going on. I have a hunch this might be due to BTW, if we can't find a way to solve this problem by only inserting just a few |
Agreed on the rule approach. Interleave will poll each of its children at most once per poll call. If none of the children returns a Ready it will return Pending itself. Each poll starts at a random child index. By varying when the yields occur in each child there’s a very high likelihood that at least one child will return Ready. As a consequence the aggregate sees an always ready stream despite the yields being injected. |
Hmm, this may be similar in spirit to a related problem sort-preserving merge operator has (dealing with pending inputs). I will discuss with @berkaysynnada and think about this a little bit, and circle back. |
This is a good example, try to find a solution.
|
Which issue does this PR close?
Rationale for this change
Some AggregateExecs can always make progress and thus they may never notice that the plan was canceled.
🧵 Yield-based Cooperative Scheduling in
AggregateExec
This PR introduces a lightweight yielding mechanism to
AggregateExec
to improve responsiveness to external signals (likeCtrl-C
) without adding any measurable performance overhead.🧭 Motivation
For aggregation queries without any
GROUP BY
Similarly for queries like
Where the computational work for each input is substantial
Are these changes tested?
Yes
Before this PR:
It will always stuck until done, we can't ctril c to stop it.
Are there any user-facing changes?
Some CPU heavy aggregation plans now cancel much sooner