Join a small in-memory spark table with huge hive table without bringing all records from hive to spark

499 Views Asked by At

Usecase : I have a small table(~1000 rows) available in spark, and a huge hive table(20 billion records). Let's call the small table as base and the huge table as main. Now, the base table has a column 'id' and I need to fetch all records from main table where main.external_id equals to base.id. Both external_id and id columns have unique values only.

Problem The obvious way is to register the base table as temp table in spark, and use something like

sparkSession.sql("select * from base_table JOIN main_table ON base_table.id = main_table.external_id")

However, this would mean that spark would fetch all rows from the huge hive table, and bring to memory, which I feel is very expensive considering we need only around 1000 rows. I am looking for a way to minimize this network data transfer.

What I have tried

  1. Partitioning/Bucketing: This was the first option we thought of but both were unfeasible since partitioning is better when columns have discreet values(like city/country) whereas the 'id' column is a unique key column. For bucketing, the issue is that we would need to create a huge number of buckets and this means a high number of files which can create some issues.

  2. JDBC query via Hiveserver2: As of now, we are able to do a read query on hive engine via JDBC driver. I was wondering if there was a way to send the base table from spark to hive engine and execute a broadcast join there, so that the network shuffle only involves the smaller table, and we don't need to bring the bigger table to the spark memory. However, I haven't been able to find anything which can help implement this.

(Obviously we can write the base table to hive first, and then do the join but as per the info I got from team, the hive write is not very performance efficient and has caused few issues in the past)

Does anyone have any solution to the problem I mentioned above? Or if there is another way to achieve the result?

P.S: I'm using spark 2.3.2 and have the same version for spark-sql, spark-hive, and hive-jdbc jars.

1

There are 1 best solutions below

3
On

If only main table values required, "in" clause can be used:

val ids = base_table.select("id").as(Encoders.INT).collect().mkString(",")
sparkSession.sql(s"select * from  main_table where external_id in ($ids)")