Rewrite physical expressions in execution plans#20009
Rewrite physical expressions in execution plans#20009LLDay wants to merge 1 commit intoapache:mainfrom
Conversation
|
@askalt, could you review the PR as well? |
|
cc @Omega359 (I remember you were also interested in placeholders within physical plans). |
Yes, I would like to be able to reuse plans - I'll try and find time in the next day or so to review this though I'm not particularly familiar with this area of code. |
7d86e73 to
eb09e71
Compare
eb09e71 to
3046ed0
Compare
3046ed0 to
4420b7f
Compare
e4dd613 to
2186826
Compare
|
Let's resolve threads with a thumb emoji. Look almost good to me, just several details are remained. |
|
@Jefffrey could you please approve CI workflow? |
2186826 to
dcc0262
Compare
askalt
left a comment
There was a problem hiding this comment.
Thank you! It looks good to me, will try to find maintainer to review it.
dcc0262 to
966c698
Compare
| }) | ||
| } | ||
|
|
||
| fn with_filter_and_projection( |
There was a problem hiding this comment.
This is not mentioned at all in the PR description. Can you please explain why this is needed?
There was a problem hiding this comment.
Thank you for the review! I have updated the PR description. This method allows to override the physical expressions within a file source. For example, we will be able to replace placeholder physical expressions. You can see it here (I removed ResolvePlaceholdersExec from the PR, but it worked by finding the placeholders and then replacing them with literals).
There was a problem hiding this comment.
What is the call chain? Is it an optimizer rule that calls this?
There was a problem hiding this comment.
DataSourceExec::with_physical_expressions -> FileScanConfig::with_physical_expressions -> FileSource::with_filter_and_projection.
This is just one of the implementations of ExecutionPlan::with_physical_expressions, we just need additional methods that rewrite expressions in internal entities.
| fn physical_expressions<'a>( | ||
| &'a self, | ||
| ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> { | ||
| None |
There was a problem hiding this comment.
I think it would make sense to in some future version require every ExecutionPlan to implement this even if it's a no op. Otherwise it's easy to forget. I think enough ExecutionPlans would have expressions that it's worth forcing implementers to make a decision.
There was a problem hiding this comment.
I wanted these changes to be non-breaking. At least, until we implemented a reliable mechanism using these methods (such as ResolvePlaceholdersExec or something else).
| fn physical_expressions<'a>( | ||
| &'a self, | ||
| ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> { | ||
| None |
There was a problem hiding this comment.
Same note about removing default impl in the future.
| /// | ||
| /// The default implementation returns `None`, indicating that the node either has no physical | ||
| /// expressions or does not support exposing them. | ||
| fn physical_expressions<'a>( |
There was a problem hiding this comment.
cc @gabotechs we were discussing adding just this the other day
| /// Returns new file source with given filter and projection. | ||
| fn with_filter_and_projection( |
There was a problem hiding this comment.
If we do add this I think it's worth making it very clear how this is intended to be used.
For example, users should not call this from their custom TableProvider implementation to pass in projections / filters. That pushdown is handled by optimizer rules. This is only to be used by ...
There was a problem hiding this comment.
Updated the documentation of the method.
966c698 to
f92e2cd
Compare
| fn with_physical_expressions( | ||
| &self, | ||
| _params: ReplacePhysicalExpr, | ||
| ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
| Ok(None) | ||
| } |
There was a problem hiding this comment.
I'm not sure if this API is enough to be used in all the current ExecutionPlan implementations.
Take AggregateExec for example:
It has:
- a Vec of
AggregateFunctionExprwhere each entry containsargs: Vec<Arc<dyn PhysicalExpr>> - the same Vec of
AggregateFunctionExpralso containsVec<PhysicalSortExpr>(with one expression each) - a
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>> - a
dynamic_filter
If AggregateExecb::physical_expressions returned something like vec![expr1, expr2, expr3, expr4], how would you know to which of all those components above each expression belongs? are the 4 expressions AggregateFunctionExprs? are the first two AggregateFunctionExprs, the third one a filter_expr and the last one a dynamic filter?
There was a problem hiding this comment.
I agree we need to be sure this is the right API before moving forward with it
how would you know to which of all those components above each expression belongs?
What would the use cases be that need to differentiate this? Would they be aggregate specific -> could they downcast the plan and call functions on AggregateExec directly?
If something needs to (1) operate on all expressions and (2) have specific logic depending on where that expression is that seems like a hard API to get right.
There was a problem hiding this comment.
If AggregateExecb::physical_expressions returned something like vec![expr1, expr2, expr3, expr4], how would you know to which of all those components above each expression belongs? are the 4 expressions AggregateFunctionExprs
A layout of the vector returned from ExecutionPlan::physical_expr(...) is plan specific. The API does not provide an ability to modify the layout itself: it could not be done generically, as each plan has different sorts of expressions, so if specific one should be modified -- the right way is to downcast the plan (caller knows which expression exactly is desired to be modified).
On other hand, this API provides an ability to switch some parts of the expressions prior the plan will be executed. For example, we are going to implement placeholders in the plans based on this API, as we can write the code like:
fn resolve_placeholders(expr: &Arc<dyn PhysicalExpr) -> Arc<dyn PhysicalExpr> {
...
}
let resolved_expr = plan.physical_expr().iter().map(|p| resolve_placeholders(p));
let plan = plan.with_physical_expr(resolved_expr);
plan.execute(...)There was a problem hiding this comment.
The API does not provide an ability to modify the layout itself:
Yes, we require that the number of expressions returned by ExecutionPlan::physical_expressions match the number of expressions that we specify in ExecutionPlan::with_physical_expressions. If we specified the wrong number of expressions, the call returns an error. This invariant is enforced here (1, 2).
When we have exactly the same number of expressions, we simply replace the corresponding expressions with new ones.
Introduces `physical_expressions` and `with_physical_expressions` methods to the `ExecutionPlan` trait. These methods provide an interface to inspect and replace physical expressions within execution plan nodes. The commit also implements these methods for various ExecutionPlans and introduces a new physical expression invariant check to ensure consistency between expression retrieval and replacement.
f92e2cd to
f7d9a07
Compare

Rationale for this change
This PR covers a part of #14342.
This PR introduces methods for replacing physical expressions in execution plan nodes. This is an intermediate step in the development of physical-level placeholders. Later, these methods will be used to resolve placeholders in physical plans.
What changes are included in this PR?
physical_expressionsandwith_physical_expressionsto theExecutionPlantrait to allow inspection and replacement of expressions within plan nodes.with_filter_and_projectionto theFileSourcetrait. This method is required to implementwith_physical_expressionsforDataSourceExec. Since file-based scans store their physical expressions (pushed-down filters and projections) within theFileSourceimplementation, we need a way to reconstruct theFileSourcewith updated expressions during plan rewriting.inner_expressions_iterandwith_inner_expressionsto theWindowExprtrait and its implementations to enable expression replacement within window functions.WindowAggExecandBoundedWindowAggExecnow use these methods to support theExecutionPlanrewriting interface.Are these changes tested?
physical_expressionsandwith_physical_expressionsmethods are verified using thecheck_physical_expressionsinvariant.Are there any user-facing changes?
Yes, a user can rewrite physical expressions in some execution plans.