Skip to content

Transform scalar correlated subqueries in Where to DependentJoin #16174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

irenjj
Copy link
Contributor

@irenjj irenjj commented May 24, 2025

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules substrait Changes to the substrait crate labels May 24, 2025
.dependent_join_on(
subquery_plan,
join_type,
vec![Expr::Literal(ScalarValue::Boolean(Some(true)))],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we have shorter syntax:

use datafusion_expr::lit;
let some_exprs = vec![lit(true)];

_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if let LogicalPlan::Filter(ref filter) = plan {
match &filter.predicate {
Copy link
Contributor

@duongcongtoai duongcongtoai May 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here are more cases i can think of:

  1. a predicate can be a complex expressions such as
 where column1=(scalar_subquery) or column2=(exists_subquery)

In this case 2 nested dependent join will be generated

  1. The scalar subquery exprs sometimes is not the direct child of the predicate for example
where column1 > 1 + (subquery)
  1. We can have 2 subqueries in the same binary expr
where (subquery1) > (subquery2) + 1

subquery_plan,
join_type,
vec![Expr::Literal(ScalarValue::Boolean(Some(true)))],
subquery.outer_ref_columns.clone(),
Copy link
Contributor

@duongcongtoai duongcongtoai May 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the subquery has some nested subquery underneath, i believe this function won't be able to return the outer_ref_columns from lower level.
For example

where column1=(select count(*) from inner_table_lv1 lv1 where lv1.column2=lv0.column2 and exists (
  select * from inner_table_lv2 lv2 where lv2.column1=lv1.column1 and lv2.column2=lv0.column3
)

In this case, the calls to subquery.outer_ref_columns will only returns lv0.column2, while the general framework needs to be aware of lv0.column3 as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where depth > 1, DataFusion doesn't support it at the planner stage. The reason is that each time parse_subquery is called, it uses the outer_query_schema, which is the schema from the previous layer of the query:

pub(super) fn parse_scalar_subquery(
        &self,
        subquery: Query,
        input_schema: &DFSchema,
        planner_context: &mut PlannerContext,
    ) -> Result<Expr> {
        let old_outer_query_schema =
            planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
         ...

In #16060, I attempted to layer the schemas of query blocks at different depths within the PlannerContext, and record the depth of the subquery's own layer within the Subquery, then pass the PlannerContext into the optimizer. What are your thoughts on this approach? Welcome discussion of your ideas. For multi-layer cases, more detailed design and discussion may be needed. Currently, I'm more inclined to handle simple use cases between adjacent layers first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder would it be more simple to let the decorrelation optimizor aware of the depth and handle recursion itself 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder would it be more simple to let the decorrelation optimizor aware of the depth and handle recursion itself 🤔

Since there are multiple optimizer rules, I'm wondering if the depth will change because of other priority rules rewrite.🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are multiple optimizer rules

In the final stage of this epic we only let one optimizor handle the decorrelation right?

Also in the middle of the implementation, even if we maintain multiple decorrelating rules, if existing rule such as DecorrelatePredicateSubquery or ScalarSubqueryToJoin detect any depth > 1, they will back off and leave the whole query untounched

Copy link
Contributor

@duongcongtoai duongcongtoai May 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#16016

I also implemented something like this, but inside an optimizor (still alot of details need to be added, but at least it is capable of detect the correlated columns (including the ones with depth > 1), correlated exprs, the depth of the dependent join node)

Copy link
Contributor Author

@irenjj irenjj May 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @duongcongtoai, I've seen your pr, It's much more comprehensive than mine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#16016

I also implemented something like this, but inside an optimizor (still alot of details need to be added, but at least it is capable of detect the correlated columns (including the ones with depth > 1), correlated exprs, the depth of the dependent join node)

Maybe we could implement an initial version first, then list some pending work as tracking issues? I'm actually quite eager to contribute and help out as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, i'll try to wrap up with some basic usecase and ask for review soon

@@ -351,6 +353,8 @@ fn find_inner_join(
join_type: JoinType::Inner,
join_constraint: JoinConstraint::On,
null_equals_null: false,
dependent_join: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use something like JoinType::DependentJoin instead of a boolean to separate it??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a bool is a little strange, so I comment TODO: maybe it's better to add a new logical plan: DependentJoin.
But if we mark dependent join by JoinType::DependentJoin, how we can know the real JoinType?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought DependentJoin's actual type was to be decided later with Decorrelate Optimizer. Hence, the suggestion, though I am not sure anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions optimizer Optimizer rules substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Transform scalar correlated subqueries in Where to DependentJoin
3 participants