Hive number of reducers in group by and count(distinct)

1.3k Views Asked by At

I was told that count(distinct ) may result in data skew because only one reducer is used.

I made a test using a table with 5 billion data with 2 queries,

Query A:

select count(distinct columnA) from tableA

Query B:

select count(columnA) from
(select columnA from tableA group by columnA) a

Actually, query A takes about 1000-1500 seconds while query B takes 500-900 seconds. The result seems expected.

However, I realize that both queries use 370 mappers and 1 reducers and thay have almost the same cumulative CPU seconds. And this means they do not have geneiune difference and the time difference may caused by cluster load.

I am confused why the all use one 1 reducers and I even tried mapreduce.job.reduces but it does not work. Btw, if they all use 1 reducers why do people suggest not to use count(distinct ) and it seems data skew is not avoidable?

1

There are 1 best solutions below

5
On BEST ANSWER

Both queries are using the same number of mappers which is expected and single final reducer, which is also expected because you need single scalar count result. Multiple reducers on the same vertex are running independently, isolated and each will produce it's own output, this is why the last stage has single reducer. The difference is in the plan.

In the first query execution single reducer reads each mapper output and does distinct count calculation on all the data, it process too much data.

Second query is using intermediate aggrgation and final reducer receives partially aggregated data (distinct values aggregated on previous step). Final reducer needs to aggregate partial results again to get final result, it can be much less data than in the first case.

As of Hive 1.2.0 there is optimization for count(distinct) and you do not need to rewrite query. Set this property: hive.optimize.distinct.rewrite=true

Also there is mapper aggregation (mapper can pre-aggregate data also and produce distinct values in the scope of their portion of data - splits) Set this property to allow map-side aggregation: hive.map.aggr=true

use EXPLAIN command to check the difference in the execution plan.

See also this answer: https://stackoverflow.com/a/51492032/2700344