Skip to content

feat: Allow cancelling of grouping operations which are CPU bound #16196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b18aeaa
feat: support inability to yeild cpu for loop when it's not using Tok…
zhuqi-lucas May 27, 2025
2a965aa
Fix fuzz test
zhuqi-lucas May 28, 2025
67ca44b
polish code
zhuqi-lucas May 28, 2025
82a179d
add comments
zhuqi-lucas May 28, 2025
da3c2d5
fix corner case when huge data
zhuqi-lucas May 28, 2025
6cf3bf0
Also add grouping case
zhuqi-lucas May 28, 2025
3251990
Address comments
zhuqi-lucas May 30, 2025
a8da370
Merge remote-tracking branch 'upstream/main' into issue_16193
zhuqi-lucas May 30, 2025
311849d
fmt
zhuqi-lucas May 30, 2025
2a2ead9
Move YieldStream into physical-plan crate
alamb May 30, 2025
3b69287
Merge remote-tracking branch 'apache/main' into issue_16193
alamb May 30, 2025
3ff9252
Use existing RecordBatchStreamAdapter
alamb May 30, 2025
5547c3c
Add timeout testing for cancellation
zhuqi-lucas May 31, 2025
aeac0ef
fmt
zhuqi-lucas May 31, 2025
6d56b78
add license
zhuqi-lucas May 31, 2025
4ddd1e5
Support sort exec for cancellation
zhuqi-lucas May 31, 2025
b2ffec7
poc: unified yield exec for leaf node
zhuqi-lucas Jun 2, 2025
4587c3b
polish code phase 1
zhuqi-lucas Jun 2, 2025
098b1ec
Add license
zhuqi-lucas Jun 2, 2025
bc65c1a
Fix testing
zhuqi-lucas Jun 2, 2025
da58d0b
Support final path
zhuqi-lucas Jun 2, 2025
021fb92
fix test
zhuqi-lucas Jun 2, 2025
5027087
polish code
zhuqi-lucas Jun 2, 2025
8509d0a
fix testing and address suggestions
zhuqi-lucas Jun 2, 2025
97c1bb7
fix
zhuqi-lucas Jun 2, 2025
5c3a14c
remove buffer
zhuqi-lucas Jun 3, 2025
97923b8
address comments
zhuqi-lucas Jun 3, 2025
118f801
fmt
zhuqi-lucas Jun 3, 2025
e7a678a
Fix test
zhuqi-lucas Jun 3, 2025
54260aa
fix
zhuqi-lucas Jun 3, 2025
cb344af
fix
zhuqi-lucas Jun 3, 2025
89a4e93
fix slt
zhuqi-lucas Jun 3, 2025
896fd59
fix tpch sql
zhuqi-lucas Jun 3, 2025
2887f87
Add flag for yield insert and disable default
zhuqi-lucas Jun 4, 2025
2de4afb
recover testing
zhuqi-lucas Jun 4, 2025
485d55a
fix
zhuqi-lucas Jun 4, 2025
e31a997
Update doc
zhuqi-lucas Jun 4, 2025
b65ab60
Address comments
zhuqi-lucas Jun 4, 2025
1bf2ad3
fix fmt
zhuqi-lucas Jun 4, 2025
8ab64b3
Support config for yield frequency
zhuqi-lucas Jun 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,19 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// When true, the optimizer will insert a Yield operator at the leaf nodes of any pipeline
/// that contains a pipeline-breaking operator, allowing the Tokio scheduler to switch to
/// other tasks while waiting.
/// Default: false (disabled).
pub enable_add_yield_for_pipeline_break: bool, default = false

/// Yield frequency in batches, it represents how many batches to process before yielding
/// to the Tokio scheduler. The default value is 64, which means that after processing
/// 64 batches, the execution will yield control back to the Tokio scheduler.
/// This setting is only effective when `enable_add_yield_for_pipeline_break` is set to true.
/// This value should be greater than 0.
pub yield_frequency_for_pipeline_break: usize, default = 64
}
}

Expand Down
249 changes: 249 additions & 0 deletions datafusion/core/tests/execution/infinite_cancel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow_schema::SortOptions;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::functions_aggregate::sum;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::SessionContext;
use datafusion::{common, physical_plan};
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_optimizer::wrap_leaves_cancellation::WrapLeaves;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::sorts::sort::SortExec;
use futures::{Stream, StreamExt};
use std::any::Any;
use std::error::Error;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::select;

struct InfiniteStream {
batch: RecordBatch,
poll_count: usize,
}

impl RecordBatchStream for InfiniteStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}

impl Stream for InfiniteStream {
type Item = common::Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.poll_count += 1;
Poll::Ready(Some(Ok(self.batch.clone())))
}
}

#[derive(Debug)]
struct InfiniteExec {
batch: RecordBatch,
properties: PlanProperties,
}

impl InfiniteExec {
fn new(batch: &RecordBatch) -> Self {
InfiniteExec {
batch: batch.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(batch.schema().clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded {
requires_infinite_memory: false,
},
),
}
}
}

impl DisplayAs for InfiniteExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "infinite")
}
}

impl ExecutionPlan for InfiniteExec {
fn name(&self) -> &str {
"infinite"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.batch.schema()
}

fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> common::Result<Arc<dyn ExecutionPlan>> {
Ok(self.clone())
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> common::Result<SendableRecordBatchStream> {
Ok(Box::pin(InfiniteStream {
batch: self.batch.clone(),
poll_count: 0,
}))
}
}

#[tokio::test]
async fn test_infinite_agg_cancel() -> Result<(), Box<dyn Error>> {
// 1) build session & schema & sample batch
let session_ctx = SessionContext::new();
let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
"value",
DataType::Int64,
false,
)])));
let mut builder = Int64Array::builder(8192);
for v in 0..8192 {
builder.append_value(v);
}
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?;

// 2) set up the infinite source + aggregation
let inf = Arc::new(InfiniteExec::new(&batch));
let aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new(vec![], vec![], vec![]),
vec![Arc::new(
AggregateExprBuilder::new(
sum::sum_udaf(),
vec![Arc::new(Column::new_with_schema("value", &schema)?)],
)
.schema(inf.schema())
.alias("sum")
.build()?,
)],
vec![None],
inf.clone(),
inf.schema(),
)?);

// 3) optimize the plan with WrapLeaves to auto-insert Yield
let mut config = ConfigOptions::new();
config.optimizer.enable_add_yield_for_pipeline_break = true;
let optimized = WrapLeaves::new().optimize(aggr.clone(), &config)?;

// 4) get the stream
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
const TIMEOUT: u64 = 1;

// 5) drive the stream inline in select!
let result = select! {
batch_opt = stream.next() => batch_opt,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => {
None
}
};

assert!(result.is_none(), "Expected timeout, but got a result");
Ok(())
}

#[tokio::test]
async fn test_infinite_sort_cancel() -> Result<(), Box<dyn Error>> {
// 1) build session & schema & sample batch
let session_ctx = SessionContext::new();
let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
false,
)]));
let mut builder = Int64Array::builder(8192);
for v in 0..8192 {
builder.append_value(v);
}
let array = builder.finish();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?;

// 2) set up the infinite source
let inf = Arc::new(InfiniteExec::new(&batch));

// 3) set up a SortExec that will never finish because input is infinite
let sort_options = SortOptions {
descending: false,
nulls_first: true,
};
let sort_expr = PhysicalSortExpr::new(
Arc::new(Column::new_with_schema("value", &schema)?),
sort_options,
);
// LexOrdering is just Vec<PhysicalSortExpr>
let lex_ordering: datafusion::physical_expr::LexOrdering = vec![sort_expr].into();
let sort_exec = Arc::new(SortExec::new(lex_ordering, inf.clone()));

// 4) optimize the plan with WrapLeaves to auto-insert Yield
let mut config = ConfigOptions::new();
config.optimizer.enable_add_yield_for_pipeline_break = true;
let optimized = WrapLeaves::new().optimize(sort_exec.clone(), &config)?;

// 5) get the stream
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
const TIMEOUT: u64 = 1;

// 6) drive the stream inline in select!
let result = select! {
batch_opt = stream.next() => batch_opt,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => {
None
}
};

assert!(
result.is_none(),
"Expected timeout for sort, but got a result"
);
Ok(())
}
1 change: 1 addition & 0 deletions datafusion/core/tests/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
// specific language governing permissions and limitations
// under the License.

mod infinite_cancel;
mod logical_plan;
12 changes: 10 additions & 2 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3531,9 +3531,16 @@ async fn test_distribute_sort_memtable() -> Result<()> {
);

let mem_table = create_memtable()?;
let session_config = SessionConfig::new()
let mut session_config = SessionConfig::new()
.with_repartition_file_min_size(1000)
.with_target_partitions(3);

// Enable add yield for pipeline break testing.
session_config
.options_mut()
.optimizer
.enable_add_yield_for_pipeline_break = true;

let ctx = SessionContext::new_with_config(session_config);
ctx.register_table("users", Arc::new(mem_table))?;

Expand All @@ -3544,7 +3551,8 @@ async fn test_distribute_sort_memtable() -> Result<()> {
let expected = &[
"SortPreservingMergeExec: [id@0 ASC NULLS LAST]",
" SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]",
" DataSourceExec: partitions=3, partition_sizes=[34, 33, 33]",
" YieldStreamExec frequency=64",
" DataSourceExec: partitions=3, partition_sizes=[34, 33, 33]",
];
plans_matches_expected!(expected, physical_plan);

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ pub mod sanity_checker;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
pub mod utils;
pub mod wrap_leaves_cancellation;

pub use optimizer::PhysicalOptimizerRule;
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;

use crate::wrap_leaves_cancellation::WrapLeaves;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -137,6 +138,7 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
Arc::new(WrapLeaves::new()),
// The SanityCheckPlan rule checks whether the order and
// distribution requirements of each node in the plan
// is satisfied. It will also reject non-runnable query
Expand Down
Loading