I'm using spark 2.4 running on Dataproc and running a batch job every 15 min to take some data from a bq table, aggregate it (sum) and store it in another bq table (overwrite) via pyspark.sql.
If I query the table in spark, it looks like the data is behind by roughly an hour. Or rather, it cuts off at roughly an hour before now. If I use the exact same query on the table that I am querying in Spark, but instead in the BQ web console, all the data is there and up to date. Am I doing something wrong? Or is this expected behavior of the connector?
Here's essentially the code I'm using:
orders_by_hour_query = """
SELECT
_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders
FROM `orders`
WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"
GROUP BY 1, 2
ORDER BY 1, 2 ASC
"""
orders_df = spark.read.format("bigquery").load(bq_dataset+".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)
EDIT: It appears that the hourly cut-off appears to be almost arbitrary. For instance it's currently "2020-11-25 06:31 UTC" but the max timestamp that queries from BQ via the Spark connector is: "2020-11-25 05:56:39 UTC."
More Info on that table:
Table size 2.65 GB
Long-term storage size 1.05 GB
Number of rows 4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration Never
Last modified Nov 24, 2020, 10:07:54 PM
Data location US
Table type Partitioned
Partitioned by Day
Partitioned on field created_at
Partition filter Not required
Streaming buffer statistics
Estimated size 1.01 MB
Estimated rows 1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM
Thanks in advance!
It looks like the missing data might be in the streaming buffer and has not yet reached BQ storage.
This means you can query it from BQ directly, but not with the BQ Spark Connector since that works over the Storage API (https://cloud.google.com/bigquery/docs/reference/storage)
As a workaround, you can try something like the below. Since it's only an hour of data, if that data is small enough, you could also simply use the BQ API directly and just convert the pandas data frame to a spark dataframe.