Question about Calcite Aggregate materialized view

33 Views Asked by At

Problem Summary

If the materialized view of a GroupAggregate is multi-shards, and compute group aggregation within the shard (only considering sum/count/max/min), so there will be duplicate groups between shards, how to use the calcite materialized view to correctly rewrite the plan in this data distribution situation?

Detailed Explanation

Processing of original data sources

Suppose there is a data table test_table. Its schema is

test_table

user           varchar
source         int
resp_time      int
request_body   varcha

During the writing process, the data of this table will be uniformly written to multiple shards (which can be considered round-robin), as shown in the figure below.

original data process

When doing a GroupAggregate query on this table

SELECT user, source, sum(resp_time)
FROM test_table
GROUP BY user, source

The query plan is as follows

LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
  LogicalTableScan(table=[[test, test_table]])


PhysicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], step=[FINAL])
  PhysicalExchange(distribution=[single])
    PhysicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], step=[PARTIAL])
      PhysicalTableScan(table=[[test, test_table]], condition=[null])

The PhysicalXXX nodes here are our own defined nodes, and the PhysicalAggregaterequires Singlefor RelDistribution, so an Exchangeoperator is added, and we also have an optimizer rule splits the PhysicalAggregateinto two stages: Partialand Final.

Processing of materialized view

Suppose there is a materialized view of the following SQL

SELECT user, source, sum(resp_time) as sum_resp_time
FROM test_table
GROUP BY user, source

In the writing process, the processing of the materialized view is uniformly routed to multiple shards, and compute group aggregation within the shard, as shown in the figure.

materialized view process

This also means that, from a global perspective, the aggregation result of each shard is only an intermediate result of global aggregation, and all the aggregation results of shard need to be aggregated again to obtain the final result.

The use of materialized view

Use calcite to perform the calculation of GroupAggregate, and the construction process of plan is as follows

RelNode root = builder
        .scan(SCHEMA_NAME, TEST_TABLE.getName())
        .aggregate(builder.groupKey("user", "source"), builder.sum(builder.field("resp_time")))
        .build();
System.out.println(RelOptUtil.toString(root));

RelOptPlanner planner = root.getCluster().getPlanner();   // VolcanoPlanner
RelOptUtil.registerDefaultRules(planner, true, false);
RelTraitSet desiredTraits = root.getCluster().traitSet()
        .replace(RelDistributions.SINGLETON)
        .replace(PhysicalConvention.INSTANCE);
root = planner.changeTraits(root, desiredTraits);
planner.setRoot(root);

For materialized view, I tried three approaches:

First
RelOptMaterialization sumMaterialization = new RelOptMaterialization(
    builder.scan(SCHEMA_NAME, TEST_TABLE_PREAGG.getName()).build(),
    builder.scan(SCHEMA_NAME, TEST_TABLE.getName())
            .aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("resp_time")))
            .build(),
    null, Arrays.asList(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
);
planner.addMaterialization(sumMaterialization);

The resulting plan is

LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
  LogicalTableScan(table=[[test, test_table]])

PhysicalExchange(distribution=[single])
  PhysicalProject(user=[$0], source=[$1], sum_resp_time=[CAST($2):INTEGER NOT NULL])
    PhysicalTableScan(table=[[test, test_table_preagg]], condition=[null])

The problem with this plan is that it directly replaces materialized views and lacks aggregation of multiple shards of materialized view

Second
RelOptMaterialization sumMaterialization = new RelOptMaterialization(
    builder.scan(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
    .aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("sum_resp_time")))
    .build(),
    builder.scan(SCHEMA_NAME, TEST_TABLE.getName())
            .aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("resp_time")))
            .build(),
    null, Arrays.asList(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
);
planner.addMaterialization(sumMaterialization);

The resulting plan is

LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
  LogicalTableScan(table=[[test, test_table]])

PhysicalExchange(distribution=[single])
  PhysicalProject(user=[$0], source=[$1], sum_resp_time=[CAST($2):INTEGER NOT NULL])
    PhysicalAggregate(group=[{0, 1}], sum_resp_time=[$SUM0($2)], step=[FINAL])
      PhysicalExchange(distribution=[single])
        PhysicalAggregate(group=[{0, 1}], sum_resp_time=[$SUM0($2)], step=[PARTIAL])
          PhysicalTableScan(table=[[test, test_table_preagg]], condition=[null])

There are two problems with this plan:

  1. Aggregate-Partial is superfluous and unnecessary because it is already done in the materialization written process
  2. The topmost MetricsExchange (distribution = [single]) is redundant because the L7 Exchange has changed the data distribution status to Single
Third

In the LogicalAggregate, the concept of Partial-Final is introduced, and when the materialized view is defined, it is defined as the Partial result

RelOptMaterialization sumMaterialization = new RelOptMaterialization(
    builder.scan(SCHEMA_NAME, TEST_TABLE_PREAGG.getName()).build(),
    builder.scan(SCHEMA_NAME, TEST_TABLE.getName())
            .aggregate(builder.groupKey("user", "source"), builder.sum(false, "sum_resp_time", builder.field("resp_time")), Step.PARTIAL)
            .build(),
    null, Arrays.asList(SCHEMA_NAME, TEST_TABLE_PREAGG.getName())
);
planner.addMaterialization(sumMaterialization);

The final plan is not satisfactory, I read MaterializedViewRuleimplementation and found that the rewrite inside is based on the top-level abstraction of Aggregateto do , the above attempt should not work.

Question

For the above scenario, the final plan I want to get should be


LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
  LogicalTableScan(table=[[test, test_table]])

PhysicalAggregate(group=[{0, 1}], sum_resp_time=[$SUM0($2)], step=[FINAL])
  PhysicalExchange(distribution=[single])
    PhysicalTableScan(table=[[test, test_table_preagg]], condition=[null])

How should I define materialized view?

0

There are 0 best solutions below