Problem Summary
If the materialized view of a GroupAggregate is multi-shards, and compute group aggregation within the shard (only considering sum/count/max/min), so there will be duplicate groups between shards, how to use the calcite materialized view to correctly rewrite the plan in this data distribution situation?
Detailed Explanation
Processing of original data sources
Suppose there is a data table test_table. Its schema is
test_table
user varchar
source int
resp_time int
request_body varcha
During the writing process, the data of this table will be uniformly written to multiple shards (which can be considered round-robin), as shown in the figure below.
When doing a GroupAggregate query on this table
SELECT user, source, sum(resp_time)
FROM test_table
GROUP BY user, source
The query plan is as follows
LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
LogicalTableScan(table=[[test, test_table]])
PhysicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], step=[FINAL])
PhysicalExchange(distribution=[single])
PhysicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], step=[PARTIAL])
PhysicalTableScan(table=[[test, test_table]], condition=[null])
The PhysicalXXX nodes here are our own defined nodes, and the PhysicalAggregaterequires Singlefor RelDistribution, so an Exchangeoperator is added, and we also have an optimizer rule splits the PhysicalAggregateinto two stages: Partialand Final.
Processing of materialized view
Suppose there is a materialized view of the following SQL
SELECT user, source, sum(resp_time) as sum_resp_time
FROM test_table
GROUP BY user, source
In the writing process, the processing of the materialized view is uniformly routed to multiple shards, and compute group aggregation within the shard, as shown in the figure.
This also means that, from a global perspective, the aggregation result of each shard is only an intermediate result of global aggregation, and all the aggregation results of shard need to be aggregated again to obtain the final result.
The use of materialized view
Use calcite to perform the calculation of GroupAggregate, and the construction process of plan is as follows
RelNode root = builder
.scan(SCHEMA_NAME, TEST_TABLE.getName())
.aggregate(builder.groupKey("user", "source"), builder.sum(builder.field("resp_time")))
.build();
System.out.println(RelOptUtil.toString(root));
RelOptPlanner planner = root.getCluster().getPlanner(); // VolcanoPlanner
RelOptUtil.registerDefaultRules(planner, true, false);
RelTraitSet desiredTraits = root.getCluster().traitSet()
.replace(RelDistributions.SINGLETON)
.replace(PhysicalConvention.INSTANCE);
root = planner.changeTraits(root, desiredTraits);
planner.setRoot(root);
For materialized view, I tried three approaches:
First
RelOptMaterialization sumMaterialization = new RelOptMaterialization(
builder.scan(SCHEMA_NAME, TEST_TABLE_PREAGG.getName()).build(),
builder.scan(SCHEMA_NAME, TEST_TABLE.getName())
.aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("resp_time")))
.build(),
null, Arrays.asList(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
);
planner.addMaterialization(sumMaterialization);
The resulting plan is
LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
LogicalTableScan(table=[[test, test_table]])
PhysicalExchange(distribution=[single])
PhysicalProject(user=[$0], source=[$1], sum_resp_time=[CAST($2):INTEGER NOT NULL])
PhysicalTableScan(table=[[test, test_table_preagg]], condition=[null])
The problem with this plan is that it directly replaces materialized views and lacks aggregation of multiple shards of materialized view
Second
RelOptMaterialization sumMaterialization = new RelOptMaterialization(
builder.scan(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
.aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("sum_resp_time")))
.build(),
builder.scan(SCHEMA_NAME, TEST_TABLE.getName())
.aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("resp_time")))
.build(),
null, Arrays.asList(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
);
planner.addMaterialization(sumMaterialization);
The resulting plan is
LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
LogicalTableScan(table=[[test, test_table]])
PhysicalExchange(distribution=[single])
PhysicalProject(user=[$0], source=[$1], sum_resp_time=[CAST($2):INTEGER NOT NULL])
PhysicalAggregate(group=[{0, 1}], sum_resp_time=[$SUM0($2)], step=[FINAL])
PhysicalExchange(distribution=[single])
PhysicalAggregate(group=[{0, 1}], sum_resp_time=[$SUM0($2)], step=[PARTIAL])
PhysicalTableScan(table=[[test, test_table_preagg]], condition=[null])
There are two problems with this plan:
- Aggregate-Partial is superfluous and unnecessary because it is already done in the materialization written process
- The topmost MetricsExchange (distribution = [single]) is redundant because the L7 Exchange has changed the data distribution status to Single
Third
In the LogicalAggregate, the concept of Partial-Final is introduced, and when the materialized view is defined, it is defined as the Partial result
RelOptMaterialization sumMaterialization = new RelOptMaterialization(
builder.scan(SCHEMA_NAME, TEST_TABLE_PREAGG.getName()).build(),
builder.scan(SCHEMA_NAME, TEST_TABLE.getName())
.aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("resp_time")), Step.PARTIAL)
.build(),
null, Arrays.asList(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
);
planner.addMaterialization(sumMaterialization);
The final plan is not satisfactory, I read MaterializedViewRuleimplementation and found that the rewrite inside is based on the top-level abstraction of Aggregateto do , the above attempt should not work.
Question
For the above scenario, the final plan I want to get should be
LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
LogicalTableScan(table=[[test, test_table]])
PhysicalAggregate(group=[{0, 1}], sum_resp_time=[$SUM0($2)], step=[FINAL])
PhysicalExchange(distribution=[single])
PhysicalTableScan(table=[[test, test_table_preagg]], condition=[null])
How should I define materialized view?