-
Notifications
You must be signed in to change notification settings - Fork 1.5k
AggregateExec not cancellable #16193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
take |
Submitted the PR #16196 The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance? |
I believe this sounds similar to from this PR from @jeffreyssmith2nd @carols10cents has made a benchmark here: |
Yes this is more or less the same issue. PR #14028 proposed adding a yield point at the leaf of the plan when moving from one file to the next. This PR adds yield points closer to the top of the plan tree just below the AggregateExec's stream by wrapping its input and then yields every 64 input batches. I was wondering if that should be row count or time interval based rather than batch count based. I found #15314 in the meantime as well. This issue provides one concrete and easily reproducible example of a query that cannot be cancelled.
Edit: conclusion in PR #14028 discussion was that calling |
I've updated the issue title and description to be less broad and focus on cancelling aggregations instead. I hope I got the tokio details right. |
Thank you @pepijnve for review, why i was not using row count because we need to calculate batch_size * batch count, we want to not affect the performance for core logic for datafusion, even batch count 64, i am wandering if it will affect the core logic performance when we have huge data. I am wandering if we can wrapper the yield logic outside the core exec logic in datafusion, such as in the datafusion-cli side if we only want to do the ctril c in datafusion-cli side. But it seems more cases besides datafusion-cli which want to terminate the streaming, for example the customers who use grpc to terminate: We can be confident to merge, if we compare the aggregate clickbench to see no performance regression. |
Updated no performance regression for the PR with huge aggregate testing: |
Just for context, I ran into this while working on a Java based application that drives the DataFusion queries. I want to be able to interrupt query execution from the Java side. From the Java side I'm basically calling I still need to write an experiment for this, but based on my reading of the documentation even with a |
Thank you @pepijnve for this info, i tried the aggregate with more than 1 partition, it seems can be canceled, and it uses RecordBatchReceiverStream inside the CoalescePartitionExec, do you mean it's not the RecordBatchReceiverStream which help the cancellation? |
Trying to figure this out 😄 I'm a Java developer mainly; still getting my head around async Rust. |
FYI I'm basing myself on the Tokio documentation and https://users.rust-lang.org/t/tokio-does-not-terminate-all-tasks-immediately-on-program-exit/100790/10 |
I am trying to do a solution with smallest change, the solution is that may be we can also wrapper with CoalescePartitionExec when the partition is 1, and if it has no regression, i believe it's the easiest and safe way. |
Tried now, it also works for the wrapper with CoalescePartitionExec when the partition is 1. diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
0 => internal_err!(
"CoalescePartitionsExec requires at least one input partition"
),
- 1 => {
- // bypass any threading / metrics if there is a single partition
- self.input.execute(0, context)
- }
+ // 1 => {
+ // // bypass any threading / metrics if there is a single partition
+ // self.input.execute(0, context)
+ // }
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
- 1 => plan.execute(0, context),
- 2.. => {
+ 1.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 |
@zhuqi-lucas @alamb I slapped together something quickly to test my cancellation hypothesis. See https://gist.github.com/pepijnve/c013a697b1869ea067e793bf3e1e115a For me this outputs the following which seems to confirm what I was thinking. Am I missing some essential element in the gist to make cancellation work?
|
@pepijnve It works for me, the change code is here: tokio = { workspace = true, features = ["macros", "signal"]} use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::functions_aggregate::sum;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion::{common, physical_plan};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::error::Error;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::signal::ctrl_c;
use datafusion::prelude::SessionContext;
struct InfiniteStream {
batch: RecordBatch,
poll_count: usize,
}
impl RecordBatchStream for InfiniteStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}
impl Stream for InfiniteStream {
type Item = common::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_count += 1;
if self.poll_count % 10_000 == 0 {
println!("InfiniteStream::poll_next {} times", self.poll_count);
}
Poll::Ready(Some(Ok(self.batch.clone())))
}
}
#[derive(Debug)]
struct InfiniteExec {
batch: RecordBatch,
properties: PlanProperties,
}
impl InfiniteExec {
fn new(batch: &RecordBatch) -> Self {
InfiniteExec {
batch: batch.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(batch.schema().clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded {
requires_infinite_memory: false,
},
),
}
}
}
impl DisplayAs for InfiniteExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "infinite")
}
}
impl ExecutionPlan for InfiniteExec {
fn name(&self) -> &str {
"infinite"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> common::Result<Arc<dyn ExecutionPlan>> {
Ok(self.clone())
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> common::Result<SendableRecordBatchStream> {
Ok(Box::pin(InfiniteStream {
batch: self.batch.clone(),
poll_count: 0,
}))
}
}
#[tokio::test]
async fn main() -> Result<(), Box<dyn Error>> {
// 1) build session & schema & sample batch
let session_ctx = SessionContext::new();
let schema = Arc::new(Schema::new(Fields::try_from(vec![
Field::new("value", DataType::Int64, false),
])?));
let mut builder = Int64Array::builder(8192);
for v in 0..8192 {
builder.append_value(v);
}
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?;
// 2) set up the infinite source + aggregation
let inf = Arc::new(InfiniteExec::new(&batch));
let aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new(vec![], vec![], vec![]),
vec![Arc::new(
AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new(
datafusion::physical_expr::expressions::Column::new_with_schema(
"value", &schema,
)?
)])
.schema(inf.schema())
.alias("sum")
.build()?,
)],
vec![None],
inf.clone(),
inf.schema(),
)?);
// 3) get the stream
let mut stream = physical_plan::execute_stream(aggr, session_ctx.task_ctx())?;
println!("Running query; press Ctrl-C to cancel");
// 4) drive the stream inline in select!
let result = tokio::select! {
batch_opt = async {
loop {
if let Some(item) = stream.next().await {
break Some(item);
} else {
break None;
}
}
} => batch_opt,
_ = ctrl_c() => {
println!("Cancellation received!");
None
}
};
// 5) handle the outcome
match result {
None => println!("No result (cancelled or empty)"),
Some(Ok(batch)) => println!("Got batch with {} rows", batch.num_rows()),
Some(Err(e)) => eprintln!("Error: {}", e),
}
println!("Exiting, stream will be dropped now");
Ok(())
} Testing result, ctril c works well: Running query; press Ctrl-C to cancel
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
InfiniteStream::poll_next 50000 times
Cancellation received!
No result (cancelled or empty)
Exiting, stream will be dropped now |
Interesting, it seems give me an example which we can use in datafusion-cli to support cancel quickly! |
🤔 testing on my machine your adapted version of the code still just keeps on running. ctrl-c does nothing. The only change I've made is to replace I tried taking the signal handling out of the picture as well, by using I'm testing on macOS fwiw. I'll run the same experiment in a Linux VM as well. I've copied the code over to https://github.com/pepijnve/datafusion_cancel_test |
Just tested on Linux and Windows; same results as on macOS. With
And with
In both cases the poll loop keeps on running until the process is killed. |
I was using IDE to run the test, and terminate, maybe it's terminated by IDE... We can try again based the PR, because it seems the PR not affect performance. |
|
Uh oh!
There was an error while loading. Please reload this page.
Describe the bug
The streams created by
AggregateExec
consume their input in a loop in their poll implementations without ever explicitly yielding. If the input stream never returnsPending
(which is often the case for file input), Tokio will never have the opportunity to abort the running task.This is often hidden by the presence of
CoalesceExec
in a query plan which runs the aggregation in a separate task.CoalesceExec
usesRecordBatchReceiverStream
which does returnPending
itself and as a consequence is cancellable, but dropping/aborting the aggregation task will still not immediately stop it since Tokio can only stop a task when it yields.To Reproduce
datafusion-cli
SET datafusion.execution.target_partitions = 1;
SELECT sum(column) from table;
on some relatively large table so that the query takes long enough to runExpected behavior
Additional context
No response
The text was updated successfully, but these errors were encountered: