Following are the steps which I need to perform in a Dataflow Pipeline:
- Read CSV files with common prefix from GCS into a Deferred Dataframe
- Perform transformations on the Dataframe
- Write the Deferred Dataframe to Cloud SQL for PostgreSQL
Following is the code:
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.dataframe.io import read_csv
from google.cloud import storage
from apache_beam.io.jdbc import WriteToJdbc
from sqlalchemy import create_engine, inspect
DB_USER = "username"
DB_PASS = "password"
# Function to check if the master tables exists
def check_master_table_exists(connection_config, master_table_name):
with create_engine(connection_config['jdbc_url']).connect() as connection:
inspector = inspect(connection)
return master_table_name in inspector.get_table_names()
# Define PostgreSQL connection properties
connection_config = {
'driver_class': 'org.postgresql.Driver',
'jdbc_url': 'jdbc:postgresql://[Private_IP]:5432/dti',
'user': DB_USER,
'password': DB_PASS,
}
# Define table names
master_table_name = 'master_eim_data'
temp_table_name = 'tmp_eim_tbl'
# Function to perform merge operation
class MergeData(beam.DoFn):
def __init__(self, connection_config, master_table_name):
self.connection_config = connection_config
self.master_table_name = master_table_name
def process(self, element):
master_table_exists = check_master_table_exists(self.connection_config, self.master_table_name)
if not master_table_exists:
logging.info(f'Table {self.master_table_name} does not exist. Please create the table')
else:
element | WriteToJdbc(
table_name=self.master_table_name,
jdbc_url=connection_config['jdbc_url'],
driver_class_name=connection_config['driver_class'],
username=connection_config['user'],
password=connection_config['password']
)
def main(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input_bucket",
default="raw_data_dti",
help="Input bucket",
)
parser.add_argument("--prefix", dest="prefix", help="CSV file prefix")
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
deferred_dataframe = (
pipeline
| read_csv('gs://{}/{}*.csv'.format(known_args.input_bucket, known_args.prefix))
)
logging.info(type(deferred_dataframe))
deferred_dataframe | WriteToJdbc(
table_name= temp_table_name,
jdbc_url=connection_config['jdbc_url'],
driver_class_name=connection_config['driver_class'],
username=connection_config['user'],
password=connection_config['password']
)
pipeline | 'Perform Merge' >> beam.ParDo(MergeData(connection_config, master_table_name))
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
main()
The error which I am receiving is as follows:
RuntimeError: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto:
name: "Department" type { nullable: true logical_type { urn: "beam:logical:pythonsdk_any:v1" } }
Can someone please help me on this. I am welcome to suggestions reg. better ways to accomplish my requirements.