Skip to content

AggregateExec not cancellable #16193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
pepijnve opened this issue May 27, 2025 · 22 comments · May be fixed by #16196
Open

AggregateExec not cancellable #16193

pepijnve opened this issue May 27, 2025 · 22 comments · May be fixed by #16196
Assignees
Labels
bug Something isn't working

Comments

@pepijnve
Copy link
Contributor

pepijnve commented May 27, 2025

Describe the bug

The streams created by AggregateExec consume their input in a loop in their poll implementations without ever explicitly yielding. If the input stream never returns Pending (which is often the case for file input), Tokio will never have the opportunity to abort the running task.

This is often hidden by the presence of CoalesceExec in a query plan which runs the aggregation in a separate task. CoalesceExec uses RecordBatchReceiverStream which does return Pending itself and as a consequence is cancellable, but dropping/aborting the aggregation task will still not immediately stop it since Tokio can only stop a task when it yields.

To Reproduce

  • Start datafusion-cli
  • Execute SET datafusion.execution.target_partitions = 1;
  • Execute SELECT sum(column) from table; on some relatively large table so that the query takes long enough to run
  • Press ctrl-c

Expected behavior

  • The running query is cancelled

Additional context

No response

@pepijnve pepijnve added the bug Something isn't working label May 27, 2025
@zhuqi-lucas
Copy link
Contributor

take

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented May 27, 2025

Submitted the PR #16196

The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance?

@alamb
Copy link
Contributor

alamb commented May 27, 2025

I believe this sounds similar to from this PR from @jeffreyssmith2nd

@carols10cents has made a benchmark here:

@pepijnve
Copy link
Contributor Author

pepijnve commented May 27, 2025

Yes this is more or less the same issue. PR #14028 proposed adding a yield point at the leaf of the plan when moving from one file to the next. This PR adds yield points closer to the top of the plan tree just below the AggregateExec's stream by wrapping its input and then yields every 64 input batches. I was wondering if that should be row count or time interval based rather than batch count based.

I found #15314 in the meantime as well. This issue provides one concrete and easily reproducible example of a query that cannot be cancelled.

The comments on PR #14028 regarding Tokio's yield_now are interesting and relevant for PR #16196 as well. Seems like the code pattern should be
I can run some tests to see what the actual behavior is in the ST and MT Tokio runtimes if that helps.

Edit: conclusion in PR #14028 discussion was that calling wake_by_ref was fine.

@pepijnve
Copy link
Contributor Author

@alamb how are you guys handling general housekeeping in the issue tracker? Should I close this issue since it's a specific example of #15314?

@alamb
Copy link
Contributor

alamb commented May 27, 2025

@alamb how are you guys handling general housekeeping in the issue tracker? Should I close this issue since it's a specific example of #15314?

That would be fine.

Alternately, I think we can close this ticket once #16196 is merged too

@pepijnve
Copy link
Contributor Author

pepijnve commented May 28, 2025

I've updated the issue title and description to be less broad and focus on cancelling aggregations instead. I hope I got the tokio details right.

@pepijnve pepijnve changed the title Inability to cancel certain plans AggregateExec not cancellable May 28, 2025
@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented May 28, 2025

Yes this is more or less the same issue. PR #14028 proposed adding a yield point at the leaf of the plan when moving from one file to the next. This PR adds yield points closer to the top of the plan tree just below the AggregateExec's stream by wrapping its input and then yields every 64 input batches. I was wondering if that should be row count or time interval based rather than batch count based.

I found #15314 in the meantime as well. This issue provides one concrete and easily reproducible example of a query that cannot be cancelled.

The comments on PR #14028 regarding Tokio's yield_now are interesting and relevant for PR #16196 as well. Seems like the code pattern should be I can run some tests to see what the actual behavior is in the ST and MT Tokio runtimes if that helps.

Edit: conclusion in PR #14028 discussion was that calling wake_by_ref was fine.

Thank you @pepijnve for review, why i was not using row count because we need to calculate batch_size * batch count, we want to not affect the performance for core logic for datafusion, even batch count 64, i am wandering if it will affect the core logic performance when we have huge data.

I am wandering if we can wrapper the yield logic outside the core exec logic in datafusion, such as in the datafusion-cli side if we only want to do the ctril c in datafusion-cli side.

But it seems more cases besides datafusion-cli which want to terminate the streaming, for example the customers who use grpc to terminate:

#14036 (comment)

We can be confident to merge, if we compare the aggregate clickbench to see no performance regression.

@zhuqi-lucas
Copy link
Contributor

Updated no performance regression for the PR with huge aggregate testing:

#16196 (comment)

@pepijnve
Copy link
Contributor Author

pepijnve commented May 28, 2025

Just for context, I ran into this while working on a Java based application that drives the DataFusion queries. I want to be able to interrupt query execution from the Java side. From the Java side I'm basically calling runtime.block_on(async { record_batch_stream.next().await }). Since I can't use Java's thread interruption mechanism to unblock the next() call I was trying to use tokio::select! with a cancellation token instead and that failed to cancel. This is the same pattern as what the CLI does, just triggered via a different channel than a signal handler.

I still need to write an experiment for this, but based on my reading of the documentation even with a RecordBatchReceiverStream the aggregate would not actually get cancelled. It will look like it has cancelled because the top level task will yield, the JoinSet will get dropped, and the spawned tasks will be aborted. But tokio will only actually be able to terminate the task once the aggregate stream's poll loop yields.

@zhuqi-lucas
Copy link
Contributor

Just for context, I ran into this while working on a Java based application that drives the DataFusion queries. I want to be able to interrupt query execution from the Java side. From the Java side I'm basically calling runtime.block_on(async { record_batch_stream.next().await }). Since I can't use Java's thread interruption mechanism to unblock the next() call I was trying to use tokio::select! with a cancellation token instead and that failed to cancel. This is the same pattern as what the CLI does, just triggered via a different channel than a signal handler.

I still need to write an experiment for this, but based on my reading of the documentation even with a RecordBatchReceiverStream the aggregate would not actually get cancelled. It will look like it has cancelled because the top level task will yield, the JoinSet will get dropped, and the spawned tasks will be aborted. But tokio will only actually be able to terminate the task once the aggregate stream's poll loop yields.

Thank you @pepijnve for this info, i tried the aggregate with more than 1 partition, it seems can be canceled, and it uses RecordBatchReceiverStream inside the CoalescePartitionExec, do you mean it's not the RecordBatchReceiverStream which help the cancellation?

@pepijnve
Copy link
Contributor Author

pepijnve commented May 28, 2025

do you mean it's not the RecordBatchReceiverStream which help the cancellation?

Trying to figure this out 😄 I'm a Java developer mainly; still getting my head around async Rust.

@pepijnve
Copy link
Contributor Author

pepijnve commented May 28, 2025

FYI I'm basing myself on the Tokio documentation and https://users.rust-lang.org/t/tokio-does-not-terminate-all-tasks-immediately-on-program-exit/100790/10

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented May 28, 2025

I am trying to do a solution with smallest change, the solution is that may be we can also wrapper with CoalescePartitionExec when the partition is 1, and if it has no regression, i believe it's the easiest and safe way.

@zhuqi-lucas
Copy link
Contributor

Tried now, it also works for the wrapper with CoalescePartitionExec when the partition is 1.

diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
             0 => internal_err!(
                 "CoalescePartitionsExec requires at least one input partition"
             ),
-            1 => {
-                // bypass any threading / metrics if there is a single partition
-                self.input.execute(0, context)
-            }
+            // 1 => {
+            //     // bypass any threading / metrics if there is a single partition
+            //     self.input.execute(0, context)
+            // }
             _ => {
                 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
                 // record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
 ) -> Result<SendableRecordBatchStream> {
     match plan.output_partitioning().partition_count() {
         0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
-        1 => plan.execute(0, context),
-        2.. => {
+        1.. => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
             // CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0

@pepijnve
Copy link
Contributor Author

@zhuqi-lucas @alamb I slapped together something quickly to test my cancellation hypothesis. See https://gist.github.com/pepijnve/c013a697b1869ea067e793bf3e1e115a

For me this outputs the following which seems to confirm what I was thinking. Am I missing some essential element in the gist to make cancellation work?

Running query
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
^Cctrl-C
No result
Dropping stream
InfiniteStream::poll_next 50000 times
InfiniteStream::poll_next 60000 times
InfiniteStream::poll_next 70000 times
InfiniteStream::poll_next 80000 times
InfiniteStream::poll_next 90000 times
InfiniteStream::poll_next 100000 times
InfiniteStream::poll_next 110000 times
... never stops

@zhuqi-lucas
Copy link
Contributor

@pepijnve It works for me, the change code is here:

tokio = { workspace = true, features = ["macros", "signal"]}
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::functions_aggregate::sum;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion::{common, physical_plan};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::error::Error;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::signal::ctrl_c;
use datafusion::prelude::SessionContext;

struct InfiniteStream {
    batch: RecordBatch,
    poll_count: usize,
}

impl RecordBatchStream for InfiniteStream {
    fn schema(&self) -> SchemaRef {
        self.batch.schema()
    }
}

impl Stream for InfiniteStream {
    type Item = common::Result<RecordBatch>;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_count += 1;
        if self.poll_count % 10_000 == 0 {
            println!("InfiniteStream::poll_next {} times", self.poll_count);
        }
        Poll::Ready(Some(Ok(self.batch.clone())))
    }
}

#[derive(Debug)]
struct InfiniteExec {
    batch: RecordBatch,
    properties: PlanProperties,
}

impl InfiniteExec {
    fn new(batch: &RecordBatch) -> Self {
        InfiniteExec {
            batch: batch.clone(),
            properties: PlanProperties::new(
                EquivalenceProperties::new(batch.schema().clone()),
                Partitioning::UnknownPartitioning(1),
                EmissionType::Incremental,
                Boundedness::Unbounded {
                    requires_infinite_memory: false,
                },
            ),
        }
    }
}

impl DisplayAs for InfiniteExec {
    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
        write!(f, "infinite")
    }
}

impl ExecutionPlan for InfiniteExec {
    fn name(&self) -> &str {
        "infinite"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        self.batch.schema()
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        _children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> common::Result<Arc<dyn ExecutionPlan>> {
        Ok(self.clone())
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> common::Result<SendableRecordBatchStream> {
        Ok(Box::pin(InfiniteStream {
            batch: self.batch.clone(),
            poll_count: 0,
        }))
    }
}

#[tokio::test]
async fn main() -> Result<(), Box<dyn Error>> {
    // 1) build session & schema & sample batch
    let session_ctx = SessionContext::new();
    let schema = Arc::new(Schema::new(Fields::try_from(vec![
        Field::new("value", DataType::Int64, false),
    ])?));
    let mut builder = Int64Array::builder(8192);
    for v in 0..8192 {
        builder.append_value(v);
    }
    let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?;

    // 2) set up the infinite source + aggregation
    let inf = Arc::new(InfiniteExec::new(&batch));
    let aggr = Arc::new(AggregateExec::try_new(
        AggregateMode::Single,
        PhysicalGroupBy::new(vec![], vec![], vec![]),
        vec![Arc::new(
            AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new(
                datafusion::physical_expr::expressions::Column::new_with_schema(
                    "value", &schema,
                )?
            )])
                .schema(inf.schema())
                .alias("sum")
                .build()?,
        )],
        vec![None],
        inf.clone(),
        inf.schema(),
    )?);

    // 3) get the stream
    let mut stream = physical_plan::execute_stream(aggr, session_ctx.task_ctx())?;

    println!("Running query; press Ctrl-C to cancel");
    // 4) drive the stream inline in select!
    let result = tokio::select! {
    batch_opt = async {
        loop {
            if let Some(item) = stream.next().await {
                break Some(item);
            } else {
                break None;
            }
        }
    } => batch_opt,
        _ = ctrl_c() => {
            println!("Cancellation received!");
            None
        }
    };

    // 5) handle the outcome
    match result {
        None => println!("No result (cancelled or empty)"),
        Some(Ok(batch)) => println!("Got batch with {} rows", batch.num_rows()),
        Some(Err(e)) => eprintln!("Error: {}", e),
    }

    println!("Exiting, stream will be dropped now");
    Ok(())
}

Testing result, ctril c works well:

Running query; press Ctrl-C to cancel
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
InfiniteStream::poll_next 50000 times
Cancellation received!
No result (cancelled or empty)
Exiting, stream will be dropped now

@zhuqi-lucas
Copy link
Contributor

Interesting, it seems give me an example which we can use in datafusion-cli to support cancel quickly!

@pepijnve
Copy link
Contributor Author

pepijnve commented May 28, 2025

🤔 testing on my machine your adapted version of the code still just keeps on running. ctrl-c does nothing. The only change I've made is to replace tokio::test with tokio::main.

I tried taking the signal handling out of the picture as well, by using tokio::time::sleep(tokio::time::Duration::from_secs(5)) instead. Same thing.

I'm testing on macOS fwiw. I'll run the same experiment in a Linux VM as well.

I've copied the code over to https://github.com/pepijnve/datafusion_cancel_test

@pepijnve
Copy link
Contributor Author

pepijnve commented May 28, 2025

Just tested on Linux and Windows; same results as on macOS.

With USE_TASK = false I see this

Running query; will time out after 5 seconds
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
InfiniteStream::poll_next 50000 times
InfiniteStream::poll_next 60000 times
InfiniteStream::poll_next 70000 times
InfiniteStream::poll_next 80000 times
InfiniteStream::poll_next 90000 times
InfiniteStream::poll_next 100000 times
InfiniteStream::poll_next 110000 times
InfiniteStream::poll_next 120000 times
InfiniteStream::poll_next 130000 times
InfiniteStream::poll_next 140000 times
InfiniteStream::poll_next 150000 times
InfiniteStream::poll_next 160000 times
InfiniteStream::poll_next 170000 times
InfiniteStream::poll_next 180000 times
InfiniteStream::poll_next 190000 times
...

And with USE_TASK = true

Running query; will time out after 5 seconds
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
InfiniteStream::poll_next 50000 times
InfiniteStream::poll_next 60000 times
InfiniteStream::poll_next 70000 times
InfiniteStream::poll_next 80000 times
InfiniteStream::poll_next 90000 times
InfiniteStream::poll_next 100000 times
InfiniteStream::poll_next 110000 times
Timeout reached!
No result (cancelled or empty)
Exiting, stream will be dropped now
InfiniteStream::poll_next 120000 times
InfiniteStream::poll_next 130000 times
InfiniteStream::poll_next 140000 times
InfiniteStream::poll_next 150000 times
InfiniteStream::poll_next 160000 times
InfiniteStream::poll_next 170000 times
...

In both cases the poll loop keeps on running until the process is killed.

@zhuqi-lucas
Copy link
Contributor

I was using IDE to run the test, and terminate, maybe it's terminated by IDE...

We can try again based the PR, because it seems the PR not affect performance.

@alamb
Copy link
Contributor

alamb commented May 29, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants