Loading Pub/Sub Message to BigQuery with Python

905 Views Asked by At

I am tryting to pull Pub/Sub message and load to message to BigQuery with python. I can pull the messages but I could not be able to load it to BigQuery. Here is a code example that write. Do you know how to load this message to BigQuery by using python.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)

dataset_id="message"
table_id="pubsub_message"

def write_messages_to_bq(dataset_id, table_id, subscription_path):
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    table = client.get_table(table_ref)

    errors = client.insert_rows(table, subscription_path)
    if not errors:
        print('Loaded {} row(s) into {}:{}'.format(len(subscription_path), dataset_id, table_id))
    else:
        print('Errors:')
        for error in errors:
            print(error)
1

There are 1 best solutions below

0
On

Your code doesn't work. You can't provide a PubSub subscription to BigQuery API to load the data. You need to do differently.

You need to write message by message (because you use the streaming API, it's not a concern) the messages in BigQuery. For that, in your callback method, the method which handle the message when it received it, you write your message to BigQuery, and, if the write is OK you ack your message.

If I refactor your code (not tested, it's just to show you the changes to perform)

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)

dataset_id="message"
table_id="pubsub_message"

client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)

def callback(message):
    print(f"Received {message}.")
    errors = client.insert_rows(table, message)
    if not errors:
        print('Loaded {} row(s) into {}:{}'.format(len(subscription_path), dataset_id, table_id))
    message.ack()
    else:
        print('Errors:')
        for error in errors:
            print(error)