I am trying to read the data from the PostgreSQL table in parallel. I am using the timestamp column as the partition column and providing the values for the lower bound, upper bound and numPartitions. It is creating multiple queries to read data in parallel but it is not pushing down the filter to the PostgreSQL database. When I use explain command on the data frame it does not have anything in the pushed filters in physical plan. I have also tried applying the filter clause after the load method but still it is not pushing down the filter.
Option 1 : Here I am not using the filter condition
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2018-01-01 00:00:00")
.option("upperBound","2018-12-31 23:59:00")
.load
Explain Plan output
== Physical Plan ==
*(1) Scan JDBCRelation(( SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] PushedFilters: [], ReadSchema: struct<columnnames>
Now if I do explain on df there is nothing in the pushed filters but the queries that I am able to fetch from the PostgreSQL using pg_stat_activity shows 12 different queries with where condition. I am providing one of the query here.
SELECT 1 FROM ( SELECT columnnames
FROM schema.Transaction ) a WHERE "transactionbegin" >= '2018-03-02 19:59:50' AND "transactionbegin" < '2018-04-02 05:59:45'
I am little confused here whether it is filtering the records in the PostgreSQL or doing that in spark as according to the explain plan you don't have anything in the pushed filters but based on the queries that are getting generated it looks like it is filtering the data in PostgreSQL.
Option 2 : Using filter condition
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2018-01-01 00:00:00")
.option("upperBound","2018-12-31 23:59:00")
.load.filter(s"TransactionBegin between cast('2018-01-01 00:00:00' as TIMESTAMP) and cast('2018-12-31 23:59:00' as TIMESTAMP)")
Explain plan for the above data frame
== Physical Plan ==
*(1) Scan JDBCRelation(( SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames]
PushedFilters: [*IsNotNull(transactionbegin), *GreaterThanOrEqual(transactionbegin,2018-01-01 00:00:00.0),..., ReadSchema: struct<columnnames>
One of the queries from the PostgreSQL using pg_stat_activity
SELECT 1 FROM ( SELECT columnnames
FROM schema.Transaction ) a
WHERE (("transactionbegin" IS NOT NULL) AND ("transactionbegin" >= '2018-01-01 00:00:00.0')
AND ("transactionbegin" <= '2018-12-31 23:59:00.0')) AND
("transactionbegin" >= '2018-06-02 01:59:35' AND "transactionbegin" < '2018-07-02 11:59:30')
What I want to understand is that why when providing the partition column and lower and upper bound it is not pushing the filter to database but after applying explicit filter by converting the value to timestamp it is pushing the filter down. Also should not the framework be smart enough to consider the value that we are passing as lower bound and upper bound to consider it as range for the timestamp column .
What is the most efficient way of dealing with it if you have huge volume of data that needs to read even after the filtering condition?
That's because the
lowerBound
andupperBound
dont filter your query, they're just used to calculate the number of partitions.From the docs