Using Spark Bigquery Connector on Dataproc and data appears to be delayed by an hour

1.1k Views Asked by At

I'm using spark 2.4 running on Dataproc and running a batch job every 15 min to take some data from a bq table, aggregate it (sum) and store it in another bq table (overwrite) via pyspark.sql.

If I query the table in spark, it looks like the data is behind by roughly an hour. Or rather, it cuts off at roughly an hour before now. If I use the exact same query on the table that I am querying in Spark, but instead in the BQ web console, all the data is there and up to date. Am I doing something wrong? Or is this expected behavior of the connector?

Here's essentially the code I'm using:

orders_by_hour_query = """
SELECT

_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders

FROM `orders`

WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"

GROUP BY 1, 2

ORDER BY 1, 2 ASC
"""

orders_df = spark.read.format("bigquery").load(bq_dataset+".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)

EDIT: It appears that the hourly cut-off appears to be almost arbitrary. For instance it's currently "2020-11-25 06:31 UTC" but the max timestamp that queries from BQ via the Spark connector is: "2020-11-25 05:56:39 UTC."

More Info on that table:

Table size  2.65 GB
Long-term storage size  1.05 GB
Number of rows  4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration    Never
Last modified   Nov 24, 2020, 10:07:54 PM
Data location   US
Table type  Partitioned
Partitioned by  Day
Partitioned on field    created_at
Partition filter    Not required

Streaming buffer statistics

Estimated size  1.01 MB
Estimated rows  1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM

Thanks in advance!

1

There are 1 best solutions below

0
On

It looks like the missing data might be in the streaming buffer and has not yet reached BQ storage.

This means you can query it from BQ directly, but not with the BQ Spark Connector since that works over the Storage API (https://cloud.google.com/bigquery/docs/reference/storage)

As a workaround, you can try something like the below. Since it's only an hour of data, if that data is small enough, you could also simply use the BQ API directly and just convert the pandas data frame to a spark dataframe.

`def bq2df(QUERY):
    bq = bigquery.Client()
    query_job = bq.query(QUERY)
    query_job.result()

    df = spark.read.format('bigquery') \
        .option('dataset', query_job.destination.dataset_id) \
        .load(query_job.destination.table_id) \
        .persist(StorageLevel.MEMORY_AND_DISK)

    return df