Skip to content

feat: Allow cancelling of grouping operations which are CPU bound #16196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from

Conversation

zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented May 27, 2025

Which issue does this PR close?

Rationale for this change

Some AggregateExecs can always make progress and thus they may never notice that the plan was canceled.

🧵 Yield-based Cooperative Scheduling in AggregateExec

This PR introduces a lightweight yielding mechanism to AggregateExec to improve responsiveness to external signals (like Ctrl-C) without adding any measurable performance overhead.

🧭 Motivation

For aggregation queries without any GROUP BY

SELECT SUM(value) FROM range(1, 50000000000);

Similarly for queries like

SELECT DISTINCT a, b, c, d, ... 

Where the computational work for each input is substantial

Are these changes tested?

Yes

Before this PR:

SET datafusion.execution.target_partitions = 1;
SELECT SUM(value) FROM range(1, 50000000000);

It will always stuck until done, we can't ctril c to stop it.

Are there any user-facing changes?

Some CPU heavy aggregation plans now cancel much sooner

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label May 27, 2025
@zhuqi-lucas
Copy link
Contributor Author

The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance?

@zhuqi-lucas zhuqi-lucas changed the title feat: support inability to yeild cpu for loop when it's not using Tok… feat: support inability to yeild for loop when it's not using Tok… May 27, 2025
@zhuqi-lucas zhuqi-lucas marked this pull request as draft May 27, 2025 15:10
@alamb
Copy link
Contributor

alamb commented May 27, 2025

Thanks @zhuqi-lucas -- I'll try running the cancellation benchmark from @carols10cents

@alamb
Copy link
Contributor

alamb commented May 27, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16193 (b18aeaa) to 2d12bf6 diff
Benchmarks: cancellation
Results will be posted here when complete

1 similar comment
@alamb
Copy link
Contributor

alamb commented May 27, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16193 (b18aeaa) to 2d12bf6 diff
Benchmarks: cancellation
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented May 27, 2025

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark cancellation.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃    HEAD ┃ issue_16193 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QCancellati… │ 31.29ms │     33.69ms │ 1.08x slower │
└──────────────┴─────────┴─────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ Benchmark Summary          ┃         ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ Total Time (HEAD)          │ 31.29ms │
│ Total Time (issue_16193)   │ 33.69ms │
│ Average Time (HEAD)        │ 31.29ms │
│ Average Time (issue_16193) │ 33.69ms │
│ Queries Faster             │       0 │
│ Queries Slower             │       1 │
│ Queries with No Change     │       0 │
└────────────────────────────┴─────────┘

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented May 28, 2025

Thank you @alamb for review and benchmark.

I am wandering if it will hit datafusion itself running performance, because we add (if logic) in the aggregate and other core exec.

If we only want to support datafusion-cli canceling logic, maybe we can add the wapper logic to datafusion-cli.

But from other related issue, it seems some cusomers use grpc to drop stream not only limited to datafusion-cli.

#14036 (comment)

May be the perfect solution is:

  1. We have a public drop stream interface which can be called by customers (grpc, etc) to cancel. I am still can't find a solution for this besides change the core exec logic itself...
  2. We also has a wrapper for datafusion-cli, so we call Ctril c, we can cancel the execution.

@zhuqi-lucas
Copy link
Contributor Author

I polish the code only affect the no grouping aggregate, maybe we can compare the clickbench, so we can be confident to merge if it not affect aggregate performance.

@zhuqi-lucas zhuqi-lucas marked this pull request as ready for review May 28, 2025 10:31
@zhuqi-lucas zhuqi-lucas changed the title feat: support inability to yeild for loop when it's not using Tok… feat: support inability to yield for loop when it's not using Tok… May 28, 2025
@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented May 28, 2025

Updated the performance for current PR:

SET datafusion.execution.target_partitions = 1;

SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value)         |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.315 seconds.

The main branch:

SET datafusion.execution.target_partitions = 1;


SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value)         |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.567 seconds.

No performance regression from the above testing.

@@ -77,6 +77,11 @@ impl AggregateStream {
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(&context))?;

// Only wrap no‐grouping aggregates in our YieldStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my own testing with partition_count = 1 group by aggregates suffer from the same problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, i can reproduce it now:

SELECT
  (value % 10)         AS group_key,
  COUNT(*)             AS cnt,
  SUM(value)           AS sum_val
FROM range(1, 5000000000) AS t
GROUP BY (value % 10)
ORDER BY group_key;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve , i also added the grouping support in latest PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From testing result, it seems the grouping by cases have some performance regression.

@zhuqi-lucas
Copy link
Contributor Author

Another solution is using CoalescePartitionsExec to wrapper:

diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
             0 => internal_err!(
                 "CoalescePartitionsExec requires at least one input partition"
             ),
-            1 => {
-                // bypass any threading / metrics if there is a single partition
-                self.input.execute(0, context)
-            }
+            // 1 => {
+            //     // bypass any threading / metrics if there is a single partition
+            //     self.input.execute(0, context)
+            // }
             _ => {
                 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
                 // record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
 ) -> Result<SendableRecordBatchStream> {
     match plan.output_partitioning().partition_count() {
         0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
-        1 => plan.execute(0, context),
-        2.. => {
+        1.. => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
             // CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0

@zhuqi-lucas
Copy link
Contributor Author

Hi @alamb , i believe we also can do the clickbench benchmark for this PR. But i am not confident about the result since it seems we will always add some overhead to aggregate. Thanks!

@alamb
Copy link
Contributor

alamb commented May 28, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16193 (6cf3bf0) to 2d12bf6 diff
Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented May 28, 2025

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0     │  1884.53ms │   1932.55ms │    no change │
│ QQuery 1     │   692.42ms │    704.34ms │    no change │
│ QQuery 2     │  1422.09ms │   1424.04ms │    no change │
│ QQuery 3     │   727.80ms │    722.42ms │    no change │
│ QQuery 4     │  1434.99ms │   1447.24ms │    no change │
│ QQuery 5     │ 15295.77ms │  15299.58ms │    no change │
│ QQuery 6     │  1997.15ms │   2013.51ms │    no change │
│ QQuery 7     │  2049.62ms │   2168.78ms │ 1.06x slower │
│ QQuery 8     │   836.82ms │    848.06ms │    no change │
└──────────────┴────────────┴─────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 26341.19ms │
│ Total Time (issue_16193)   │ 26560.52ms │
│ Average Time (HEAD)        │  2926.80ms │
│ Average Time (issue_16193) │  2951.17ms │
│ Queries Faster             │          0 │
│ Queries Slower             │          1 │
│ Queries with No Change     │          8 │
└────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0     │    15.54ms │     15.42ms │    no change │
│ QQuery 1     │    33.15ms │     33.70ms │    no change │
│ QQuery 2     │    80.58ms │     80.48ms │    no change │
│ QQuery 3     │    96.06ms │     95.48ms │    no change │
│ QQuery 4     │   588.87ms │    595.63ms │    no change │
│ QQuery 5     │   818.19ms │    815.70ms │    no change │
│ QQuery 6     │    23.64ms │     23.47ms │    no change │
│ QQuery 7     │    37.27ms │     36.33ms │    no change │
│ QQuery 8     │   926.36ms │    910.32ms │    no change │
│ QQuery 9     │  1187.66ms │   1209.81ms │    no change │
│ QQuery 10    │   267.19ms │    257.24ms │    no change │
│ QQuery 11    │   293.30ms │    292.35ms │    no change │
│ QQuery 12    │   911.85ms │    911.14ms │    no change │
│ QQuery 13    │  1247.36ms │   1317.95ms │ 1.06x slower │
│ QQuery 14    │   848.42ms │    844.38ms │    no change │
│ QQuery 15    │   834.03ms │    835.94ms │    no change │
│ QQuery 16    │  1736.24ms │   1752.77ms │    no change │
│ QQuery 17    │  1613.09ms │   1606.14ms │    no change │
│ QQuery 18    │  3084.41ms │   3051.47ms │    no change │
│ QQuery 19    │    83.51ms │     84.57ms │    no change │
│ QQuery 20    │  1125.18ms │   1120.50ms │    no change │
│ QQuery 21    │  1285.82ms │   1308.43ms │    no change │
│ QQuery 22    │  2139.79ms │   2162.32ms │    no change │
│ QQuery 23    │  8003.46ms │   7992.05ms │    no change │
│ QQuery 24    │   463.30ms │    450.76ms │    no change │
│ QQuery 25    │   384.45ms │    381.51ms │    no change │
│ QQuery 26    │   521.56ms │    522.91ms │    no change │
│ QQuery 27    │  1586.93ms │   1590.23ms │    no change │
│ QQuery 28    │ 12567.57ms │  12455.67ms │    no change │
│ QQuery 29    │   526.70ms │    534.84ms │    no change │
│ QQuery 30    │   807.81ms │    815.44ms │    no change │
│ QQuery 31    │   858.67ms │    848.84ms │    no change │
│ QQuery 32    │  2639.73ms │   2641.91ms │    no change │
│ QQuery 33    │  3342.77ms │   3316.54ms │    no change │
│ QQuery 34    │  3312.00ms │   3325.35ms │    no change │
│ QQuery 35    │  1312.86ms │   1309.17ms │    no change │
│ QQuery 36    │   117.39ms │    116.89ms │    no change │
│ QQuery 37    │    55.88ms │     56.08ms │    no change │
│ QQuery 38    │   121.82ms │    119.37ms │    no change │
│ QQuery 39    │   192.09ms │    195.42ms │    no change │
│ QQuery 40    │    44.75ms │     49.63ms │ 1.11x slower │
│ QQuery 41    │    42.75ms │     44.34ms │    no change │
│ QQuery 42    │    38.39ms │     38.13ms │    no change │
└──────────────┴────────────┴─────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 56218.38ms │
│ Total Time (issue_16193)   │ 56166.59ms │
│ Average Time (HEAD)        │  1307.40ms │
│ Average Time (issue_16193) │  1306.20ms │
│ Queries Faster             │          0 │
│ Queries Slower             │          2 │
│ Queries with No Change     │         41 │
└────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ issue_16193 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 122.53ms │    122.67ms │     no change │
│ QQuery 2     │  21.63ms │     21.72ms │     no change │
│ QQuery 3     │  35.49ms │     33.31ms │ +1.07x faster │
│ QQuery 4     │  19.72ms │     20.20ms │     no change │
│ QQuery 5     │  51.62ms │     52.88ms │     no change │
│ QQuery 6     │  11.84ms │     11.99ms │     no change │
│ QQuery 7     │  97.55ms │     94.51ms │     no change │
│ QQuery 8     │  25.70ms │     25.67ms │     no change │
│ QQuery 9     │  59.26ms │     59.82ms │     no change │
│ QQuery 10    │  56.45ms │     56.10ms │     no change │
│ QQuery 11    │  11.50ms │     11.77ms │     no change │
│ QQuery 12    │  41.66ms │     39.51ms │ +1.05x faster │
│ QQuery 13    │  27.43ms │     26.76ms │     no change │
│ QQuery 14    │   9.52ms │      9.56ms │     no change │
│ QQuery 15    │  23.59ms │     22.84ms │     no change │
│ QQuery 16    │  21.71ms │     21.73ms │     no change │
│ QQuery 17    │  94.44ms │     95.31ms │     no change │
│ QQuery 18    │ 210.48ms │    215.52ms │     no change │
│ QQuery 19    │  26.60ms │     26.03ms │     no change │
│ QQuery 20    │  35.72ms │     36.88ms │     no change │
│ QQuery 21    │ 160.00ms │    160.99ms │     no change │
│ QQuery 22    │  16.65ms │     17.19ms │     no change │
└──────────────┴──────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary          ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 1181.11ms │
│ Total Time (issue_16193)   │ 1182.95ms │
│ Average Time (HEAD)        │   53.69ms │
│ Average Time (issue_16193) │   53.77ms │
│ Queries Faster             │         2 │
│ Queries Slower             │         0 │
│ Queries with No Change     │        20 │
└────────────────────────────┴───────────┘

@alamb

This comment was marked as outdated.

@alamb
Copy link
Contributor

alamb commented May 28, 2025

Running the benchmarks again to gather more details

@alamb

This comment was marked as outdated.

@zhuqi-lucas
Copy link
Contributor Author

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 0     │  1906.70ms │   1850.03ms │ no change │
│ QQuery 1     │   691.18ms │    691.76ms │ no change │
│ QQuery 2     │  1411.18ms │   1436.19ms │ no change │
│ QQuery 3     │   696.17ms │    700.91ms │ no change │
│ QQuery 4     │  1438.68ms │   1458.59ms │ no change │
│ QQuery 5     │ 15100.87ms │  14874.14ms │ no change │
│ QQuery 6     │  1996.44ms │   1982.89ms │ no change │
│ QQuery 7     │  2089.84ms │   2063.71ms │ no change │
│ QQuery 8     │   830.88ms │    835.14ms │ no change │
└──────────────┴────────────┴─────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 26161.93ms │
│ Total Time (issue_16193)   │ 25893.35ms │
│ Average Time (HEAD)        │  2906.88ms │
│ Average Time (issue_16193) │  2877.04ms │
│ Queries Faster             │          0 │
│ Queries Slower             │          0 │
│ Queries with No Change     │          9 │
└────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    15.18ms │     15.80ms │     no change │
│ QQuery 1     │    33.16ms │     32.97ms │     no change │
│ QQuery 2     │    77.60ms │     80.92ms │     no change │
│ QQuery 3     │    97.69ms │     97.56ms │     no change │
│ QQuery 4     │   589.28ms │    594.05ms │     no change │
│ QQuery 5     │   847.85ms │    836.44ms │     no change │
│ QQuery 6     │    23.38ms │     22.84ms │     no change │
│ QQuery 7     │    39.02ms │     36.26ms │ +1.08x faster │
│ QQuery 8     │   923.80ms │    927.93ms │     no change │
│ QQuery 9     │  1194.31ms │   1194.76ms │     no change │
│ QQuery 10    │   261.49ms │    264.83ms │     no change │
│ QQuery 11    │   298.34ms │    300.14ms │     no change │
│ QQuery 12    │   889.86ms │    893.87ms │     no change │
│ QQuery 13    │  1343.97ms │   1319.24ms │     no change │
│ QQuery 14    │   850.37ms │    847.33ms │     no change │
│ QQuery 15    │   829.14ms │    827.15ms │     no change │
│ QQuery 16    │  1756.84ms │   1712.96ms │     no change │
│ QQuery 17    │  1607.84ms │   1612.75ms │     no change │
│ QQuery 18    │  3257.72ms │   3059.25ms │ +1.06x faster │
│ QQuery 19    │    84.43ms │     81.15ms │     no change │
│ QQuery 20    │  1143.94ms │   1098.25ms │     no change │
│ QQuery 21    │  1299.30ms │   1302.19ms │     no change │
│ QQuery 22    │  2139.01ms │   2154.26ms │     no change │
│ QQuery 23    │  7959.94ms │   7886.41ms │     no change │
│ QQuery 24    │   463.30ms │    458.37ms │     no change │
│ QQuery 25    │   386.27ms │    380.85ms │     no change │
│ QQuery 26    │   524.88ms │    519.71ms │     no change │
│ QQuery 27    │  1581.12ms │   1583.97ms │     no change │
│ QQuery 28    │ 12730.61ms │  12509.52ms │     no change │
│ QQuery 29    │   536.15ms │    528.59ms │     no change │
│ QQuery 30    │   793.18ms │    795.03ms │     no change │
│ QQuery 31    │   857.17ms │    852.83ms │     no change │
│ QQuery 32    │  2653.95ms │   2631.77ms │     no change │
│ QQuery 33    │  3315.12ms │   3322.64ms │     no change │
│ QQuery 34    │  3381.42ms │   3355.04ms │     no change │
│ QQuery 35    │  1317.57ms │   1278.45ms │     no change │
│ QQuery 36    │   129.98ms │    122.68ms │ +1.06x faster │
│ QQuery 37    │    56.20ms │     53.71ms │     no change │
│ QQuery 38    │   119.55ms │    121.04ms │     no change │
│ QQuery 39    │   193.87ms │    194.76ms │     no change │
│ QQuery 40    │    48.93ms │     49.63ms │     no change │
│ QQuery 41    │    44.38ms │     45.77ms │     no change │
│ QQuery 42    │    36.47ms │     37.46ms │     no change │
└──────────────┴────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 56733.55ms │
│ Total Time (issue_16193)   │ 56041.14ms │
│ Average Time (HEAD)        │  1319.38ms │
│ Average Time (issue_16193) │  1303.28ms │
│ Queries Faster             │          3 │
│ Queries Slower             │          0 │
│ Queries with No Change     │         40 │
└────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ issue_16193 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │ 120.69ms │    123.19ms │ no change │
│ QQuery 2     │  22.13ms │     21.87ms │ no change │
│ QQuery 3     │  34.29ms │     34.30ms │ no change │
│ QQuery 4     │  19.41ms │     19.60ms │ no change │
│ QQuery 5     │  52.72ms │     51.09ms │ no change │
│ QQuery 6     │  11.87ms │     12.05ms │ no change │
│ QQuery 7     │  96.12ms │     93.63ms │ no change │
│ QQuery 8     │  25.70ms │     25.62ms │ no change │
│ QQuery 9     │  58.58ms │     58.90ms │ no change │
│ QQuery 10    │  55.95ms │     56.01ms │ no change │
│ QQuery 11    │  11.46ms │     11.44ms │ no change │
│ QQuery 12    │  41.40ms │     40.05ms │ no change │
│ QQuery 13    │  27.95ms │     27.54ms │ no change │
│ QQuery 14    │   9.57ms │      9.64ms │ no change │
│ QQuery 15    │  23.71ms │     24.29ms │ no change │
│ QQuery 16    │  21.66ms │     22.40ms │ no change │
│ QQuery 17    │  96.13ms │     95.60ms │ no change │
│ QQuery 18    │ 209.28ms │    219.00ms │ no change │
│ QQuery 19    │  26.43ms │     25.81ms │ no change │
│ QQuery 20    │  37.60ms │     36.09ms │ no change │
│ QQuery 21    │ 161.77ms │    163.00ms │ no change │
│ QQuery 22    │  16.00ms │     16.74ms │ no change │
└──────────────┴──────────┴─────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary          ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 1180.44ms │
│ Total Time (issue_16193)   │ 1187.87ms │
│ Average Time (HEAD)        │   53.66ms │
│ Average Time (issue_16193) │   53.99ms │
│ Queries Faster             │         0 │
│ Queries Slower             │         0 │
│ Queries with No Change     │        22 │
└────────────────────────────┴───────────┘

Thank you @alamb , it's surprising that performance has no regression, even faster for clickbench_partitioned, it may due to we yield for each partition running, and those make the partition running more efficient.

@xudong963 xudong963 changed the title feat: support inability to yield for loop when it's not using Tok… feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) May 29, 2025
@zhuqi-lucas
Copy link
Contributor Author

EmissionType is the closest to what I have in mind I think. Would it make sense to wrap the inputs of ExecutionPlans that report EmissionType::Final as an approximation? A hypothetical ConsumptionType per child would allow only wrapping the build side of a hash join for instance.

We're kind of still 'modifying' each operator, but in a declarative fashion rather than requiring the logic of each implementation to be updated.

@zhuqi-lucas, I think you are very close to a non-invasive solution that we can ship quickly 🚀 IIUC, this doesn't modify any operators, it just inserts a cancellation-friendly parent to all leaves.

Three things come to mind as finishing touches:

  1. Making the rule smarter, by utilizing EmissionType information, so that it only adds YieldExecs when necessary. If the path from a leaf to the root does not involve any operator that is pipeline-breaking, there is no need to insert a YieldExec as the parent of that leaf. I think this similar to @pepijnve' thinking.
  2. Having a configuration flag to sidestep this rule for users who don't want to get any (however small) performance penalty.
  3. Benchmarking to make sure performance penalty is small.

Thanks for the awesome collaboration 💪

Thank you @pepijnve , @ozankabak , i will try to polish the code and make all CI tasks green!

@pepijnve
Copy link
Contributor

pepijnve commented Jun 2, 2025

Making the rule smarter, by utilizing EmissionType information, so that it only adds YieldExecs when necessary. If the path from a leaf to the root does not involve any operator that is pipeline-breaking, there is no need to insert a YieldExec as the parent of that leaf. I think this similar to @pepijnve' thinking.

My thinking was that we could use EmissionType to insert the yield wrapper closer to where it's needed rather than at the leaves.

So if you have

AggregateExec -> final
  FilterExec -> copy input behavior
    ProjectionExec -> copy input behavior
      DataSourceExec -> incremental

rather than adding yield as parent of the leaves

AggregateExec -> final
  FilterExec -> copy input behavior
    ProjectionExec -> copy input behavior
      YieldExec -> copy input behavior
        DataSourceExec -> incremental

you would add it as parent of the children of the node with emission type final

AggregateExec -> final
  YieldExec -> copy input behavior
    FilterExec -> copy input behavior
      ProjectionExec -> copy input behavior
        DataSourceExec -> incremental

I'll admit that I didn't work out all the possible scenarios, but my reasoning is that if you have more than one pipeline breaking operator in a chain that this will work more reliably since you're 'fixing' each of them rather than injecting yield points at the leaves and hoping this propagates through the entire chain.
Additional benefit might be that it's more trivial to implement the plan transformation this way since it only requires very local context (i.e. the 'final' nodes themselves).

@zhuqi-lucas
Copy link
Contributor Author

Making the rule smarter, by utilizing EmissionType information, so that it only adds YieldExecs when necessary. If the path from a leaf to the root does not involve any operator that is pipeline-breaking, there is no need to insert a YieldExec as the parent of that leaf. I think this similar to @pepijnve' thinking.

My thinking was that we could use EmissionType to insert the yield wrapper closer to where it's needed rather than at the leaves.

So if you have

AggregateExec -> final
  FilterExec -> copy input behavior
    ProjectionExec -> copy input behavior
      DataSourceExec -> incremental

rather than adding yield as parent of the leaves

AggregateExec -> final
  FilterExec -> copy input behavior
    ProjectionExec -> copy input behavior
      YieldExec -> copy input behavior
        DataSourceExec -> incremental

you would add it as parent of the children of the node with emission type final

AggregateExec -> final
  YieldExec -> copy input behavior
    FilterExec -> copy input behavior
      ProjectionExec -> copy input behavior
        DataSourceExec -> incremental

I'll admit that I didn't work out all the possible scenarios, but my reasoning is that if you have more than one pipeline breaking operator in a chain that this will work more reliably since you're 'fixing' each of them rather than injecting yield points at the leaves and hoping this propagates through the entire chain. Additional benefit might be that it's more trivial to implement the plan transformation this way since it only requires very local context (i.e. the 'final' nodes themselves).

It makes sense and a smart solution! I will try to address.

@ozankabak
Copy link
Contributor

ozankabak commented Jun 2, 2025

My thinking was that we could use EmissionType to insert the yield wrapper closer to where it's needed rather than at the leaves.

I don't think this is a good idea. We can have many operators in a plan that break the pipeline, especially in plans with joins and windowing (cascades of windowing operators are quite common). Simply adding a YieldExec as a parent to each pipeline-breaking operator would be bad -- prompting the question of where to insert the YieldExec in such plans.

Inserting it as a parent of leaf nodes, and only when necessary (first item in my message above), gives us a system where the least number of necessary YieldExecs are inserted, and at a non-arbitrary place.

@zhuqi-lucas
Copy link
Contributor Author

My thinking was that we could use EmissionType to insert the yield wrapper closer to where it's needed rather than at the leaves.

I don't think this is a good idea. We can have many operators in a plan that break the pipeline, especially in plans with joins and windowing (cascades of windowing operators are quite common). Simply adding a YieldExec as a parent to each pipeline-breaking operator would be bad -- prompting the question of where to insert the YieldExec arises in such plans.

Inserting it as a parent of leaf nodes, and only when necessary (first item in my message above), gives us a system where the least number of necessary YieldExecs are inserted, and at a non-arbitrary place.

I agree with this now. I actually tried using EmissionType to insert the YieldExec wrapper closer to where it's needed instead of just at the leaves.

However, during SQL logic test runs with complex plans, it caused many errors, including stack overflows. I believe this approach introduces more edge cases and makes the behavior harder to reason about.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jun 2, 2025
Poll::Ready(Some(Ok(batch)))
}
}
other => other,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the child stream returns Pending, it's also ok to reset the batches_processed counter to zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, thank you @pepijnve !

}
}

impl ExecutionPlan for YieldStreamExec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should YieldStreamExec implement more of the ExecutionPlan trait? I'm not an expert on these things, but here's my assessment from going through the functions quickly:

  • maintains_input_order can return true
  • supports_limit_pushdown can, I think, return true.
  • statistics should probably just delegate to the child
  • cardinality_effect can return Equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in latest PR, and also fix testing. Thanks

@@ -33,12 +33,12 @@ use std::borrow::Cow;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::AggregateExec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider reverting this change and the change in row_hash.rs to keep those files out of the commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, thank you @pepijnve !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried format command, it seems generate same result, so we can keep it here.

plan: Arc<dyn ExecutionPlan>,
has_pipeline_breaking_above: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let children = plan.children();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth trying to use the TreeNode transformation facilities here? Something like below. It's just as much code, but makes use of the facilities provided by the crate.

fn wrap_leaves(
    plan: Arc<dyn ExecutionPlan>,
) -> common::Result<Transformed<Arc<dyn ExecutionPlan>>> {
    if plan.children().is_empty() {
        // Leaf node: wrap it in `YieldStreamExec` and stop recursing
        Ok(Transformed::new(Arc::new(YieldStreamExec::new(plan)), true, TreeNodeRecursion::Jump))
    } else {
        Ok(Transformed::no(plan))
    }
}

fn wrap_leaves_of_pipeline_breakers(
    plan: Arc<dyn ExecutionPlan>,
) -> common::Result<Transformed<Arc<dyn ExecutionPlan>>> {
    let is_pipeline_breaker = plan.properties().emission_type == EmissionType::Final;
    if is_pipeline_breaker {
        let mut transformed = plan.transform_down(wrap_leaves)?;
        transformed.tnr = TreeNodeRecursion::Jump; 
        Ok(transformed)
    } else {
        Ok(Transformed::no(plan))
    }
}

impl PhysicalOptimizerRule for WrapLeaves {
    fn optimize(&self, plan: Arc<dyn ExecutionPlan>, _: &ConfigOptions) -> common::Result<Arc<dyn ExecutionPlan>> {
        plan.transform_down(wrap_leaves_of_pipeline_breakers).data()
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! I will try to address this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in latest PR.

@pepijnve
Copy link
Contributor

pepijnve commented Jun 2, 2025

Inserting it as a parent of leaf nodes, and only when necessary (first item in my message above), gives us a system where the least number of necessary YieldExecs are inserted, and at a non-arbitrary place.

I think this might still be insufficient for something like this.

Agg:
  Interleave:
    Agg:
      CoalesceBatches:
        Filter:
          Yield:
            Data
    Agg:
      Yield:
        Data

The filter + coalesce is there to ensure the Pendings coming from the Yielda are not emitted in perfect lockstep.

What I think can happen here is that CombinedRecordBatchStream created by Interleave will end up ping-ponging between the two child aggregates when they return Pending and you still get stuck in the for loop of the parent Aggregate.

I did not verify this yet, but it might be worth adding a test case for this type of situation as well.

@ozankabak
Copy link
Contributor

ozankabak commented Jun 2, 2025

I did not verify this yet, but it might be worth adding a test case for this type of situation as well.

Agreed, let's add a test for this. I don't think we'll have a problem, but maybe I'm wrong and it is easy to verify.

@pepijnve
Copy link
Contributor

pepijnve commented Jun 2, 2025

Agreed, let's add a test for this. I don't think we'll have a problem, but maybe I'm wrong and it is easy to verify.

Working on it and it seems to get cancelled indeed. I'll work on understanding why and report back.

) -> Poll<Option<Self::Item>> {
let this = &mut *self;

if let Some(batch) = this.buffer.take() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can make this even simpler by moving this block

if this.batches_processed >= YIELD_BATCHES {
  this.batches_processed = 0;
  cx.waker().wake_by_ref();
  Poll::Pending
}

all the way to the beginning of the function. That way there's no need for the buffer and you'll still get the desired effect of returning Pending regularly.

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not adding the buffer, but If we remove the buffer, it will generate wrong result, here is the reproducer case:

SELECT SUM(value)
FROM range(1,500000000) AS t;
+--------------------+
| sum(t.value)       |
+--------------------+
| 123047621195087232 |
+--------------------+
1 row(s) fetched.
Elapsed 0.248 seconds.

Right result from main branch:

SELECT SUM(value)
FROM range(1,500000000) AS t;
+--------------------+
| sum(t.value)       |
+--------------------+
| 124999999750000000 |
+--------------------+
1 row(s) fetched.
Elapsed 0.319 seconds.

So i added the buffer logic. And testing cases will generate right answer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it now, thank you @pepijnve , we can remove the buffer if we change to:

// Stream<Item = Result<RecordBatch>> to poll_next_unpin
impl Stream for YieldStream {
    type Item = Result<RecordBatch>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.batches_processed >= YIELD_BATCHES {
            self.batches_processed = 0;
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }

        match self.inner.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(batch))) => {
                self.batches_processed += 1;
                Poll::Ready(Some(Ok(batch)))
            }
            Poll::Pending => {
                self.batches_processed = 0;
                Poll::Pending
            }

            other => other,
        }
    }
}

Addressed in latest PR.

@pepijnve
Copy link
Contributor

pepijnve commented Jun 2, 2025

Working on it and it seems to get cancelled indeed. I'll work on understanding why and report back.

@ozankabak I was able to get another non-exciting plan as follows. It's a contrived example, but it does demonstrate how Pending does not always bubble far enough up to allow a task to actually stop. Code for this is at https://github.com/pepijnve/datafusion_cancel_test.git

While I was at it I added a child-of-pipeline-breaker wrapping test as well which is capable of cancelling this particular plan.

  AggregateExec: mode=Single, gby=[], aggr=[total]
    InterleaveExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 8192
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 8111
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 8030
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7949
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7868
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7787
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7706
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7625
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7544
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7463
          YieldStreamExec yield=64
            InfiniteExec

@ozankabak
Copy link
Contributor

ozankabak commented Jun 2, 2025

Hmm, cool example! I want to understand exactly what is going on. I have a hunch this might be due to InterleaveExec not behaving "ideally", but I could be wrong. Can you help me understand what happens here?

BTW, if we can't find a way to solve this problem by only inserting just a few YieldExecs, we can still fall back to finding a rule that is less "optimal" in the number YieldExecs, but always cancels (i.e. like inserting before every pipeline-breaker). I think the general approach of having a rule and a new operator is still very much the best we have so far.

@pepijnve
Copy link
Contributor

pepijnve commented Jun 2, 2025

Agreed on the rule approach.

Interleave will poll each of its children at most once per poll call. If none of the children returns a Ready it will return Pending itself. Each poll starts at a random child index. By varying when the yields occur in each child there’s a very high likelihood that at least one child will return Ready. As a consequence the aggregate sees an always ready stream despite the yields being injected.

@ozankabak
Copy link
Contributor

ozankabak commented Jun 2, 2025

Hmm, this may be similar in spirit to a related problem sort-preserving merge operator has (dealing with pending inputs). I will discuss with @berkaysynnada and think about this a little bit, and circle back.

@zhuqi-lucas
Copy link
Contributor Author

Agreed on the rule approach.

Interleave will poll each of its children at most once per poll call. If none of the children returns a Ready it will return Pending itself. Each poll starts at a random child index. By varying when the yields occur in each child there’s a very high likelihood that at least one child will return Ready. As a consequence the aggregate sees an always ready stream despite the yields being injected.

This is a good example, try to find a solution.

Working on it and it seems to get cancelled indeed. I'll work on understanding why and report back.

@ozankabak I was able to get another non-exciting plan as follows. It's a contrived example, but it does demonstrate how Pending does not always bubble far enough up to allow a task to actually stop. Code for this is at https://github.com/pepijnve/datafusion_cancel_test.git

While I was at it I added a child-of-pipeline-breaker wrapping test as well which is capable of cancelling this particular plan.

  AggregateExec: mode=Single, gby=[], aggr=[total]
    InterleaveExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 8192
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 8111
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 8030
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7949
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7868
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7787
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7706
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7625
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7544
          YieldStreamExec yield=64
            InfiniteExec
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: value@0 > 7463
          YieldStreamExec yield=64
            InfiniteExec

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
5 participants