Writing DeferredDataFrame to CloudSQL for PostgreSQL

72 Views Asked by At

Following are the steps which I need to perform in a Dataflow Pipeline:

  1. Read CSV files with common prefix from GCS into a Deferred Dataframe
  2. Perform transformations on the Dataframe
  3. Write the Deferred Dataframe to Cloud SQL for PostgreSQL

Following is the code:

import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.dataframe.io import read_csv
from google.cloud import storage
from apache_beam.io.jdbc import WriteToJdbc
from sqlalchemy import create_engine, inspect

DB_USER = "username"
DB_PASS = "password"

# Function to check if the master tables exists
def check_master_table_exists(connection_config, master_table_name):
    with create_engine(connection_config['jdbc_url']).connect() as connection:
        inspector = inspect(connection)
        return master_table_name in inspector.get_table_names()

# Define PostgreSQL connection properties
connection_config = {
    'driver_class': 'org.postgresql.Driver',
    'jdbc_url': 'jdbc:postgresql://[Private_IP]:5432/dti',
    'user': DB_USER,
    'password': DB_PASS,
}

# Define table names
master_table_name = 'master_eim_data'
temp_table_name = 'tmp_eim_tbl'

# Function to perform merge operation
class MergeData(beam.DoFn):
    def __init__(self, connection_config, master_table_name):
        self.connection_config = connection_config
        self.master_table_name = master_table_name

    def process(self, element):
        master_table_exists = check_master_table_exists(self.connection_config, self.master_table_name)

        if not master_table_exists:
            logging.info(f'Table {self.master_table_name} does not exist. Please create the table')
        else:
            element | WriteToJdbc(
                table_name=self.master_table_name,
                jdbc_url=connection_config['jdbc_url'],
                driver_class_name=connection_config['driver_class'],
                username=connection_config['user'],
                password=connection_config['password']
            )

def main(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",
        dest="input_bucket",
        default="raw_data_dti",
        help="Input bucket",
    )
    parser.add_argument("--prefix", dest="prefix", help="CSV file prefix")
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as pipeline:
        deferred_dataframe = (
            pipeline
            | read_csv('gs://{}/{}*.csv'.format(known_args.input_bucket, known_args.prefix))
        )

        logging.info(type(deferred_dataframe))

        deferred_dataframe | WriteToJdbc(
            table_name= temp_table_name,
            jdbc_url=connection_config['jdbc_url'],
            driver_class_name=connection_config['driver_class'],
            username=connection_config['user'],
            password=connection_config['password']
        )
        
        pipeline | 'Perform Merge' >> beam.ParDo(MergeData(connection_config, master_table_name))
        

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    main()

The error which I am receiving is as follows:

RuntimeError: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto:

name: "Department" type { nullable: true logical_type { urn: "beam:logical:pythonsdk_any:v1" } }

Can someone please help me on this. I am welcome to suggestions reg. better ways to accomplish my requirements.

0

There are 0 best solutions below