Oracle to Big Query Tables

1.8k Views Asked by At

I am new to big query and learning it. I have a requirement where I'll have to load close to 300 tables from oracle source to Big Query staging tables. what is the recommended way to load the data? I know I can use dataflow for that but do I have to create 300 dataflow tasks for it or create single job to iterate it? Please share your experience and different ways to do it. Thanks a lot.

Regards, Venkat.

2

There are 2 best solutions below

7
Joe On BEST ANSWER

In my experience , we wanted to migrate our data warehouse into bigquery , I didn't use dataflow or any tool I just exported the tables into csv files and then used python code to iterate over the file and upload them into bigquery https://cloud.google.com/bigquery/docs/loading-data-local#python

or you can upload them into gcs and then into bigquery if it's a daily operation I think it's easier to maintain a single code that iterate over a list of tables extract them and append them into bigquery tables than creating 300 tasks

update:

example code for reading data from oracle to bigquery using pandas-gbq:

import cx_Oracle
from sqlalchemy import create_engine

engine = create_engine('oracle://user:password@host_or_scan_address:1521/ORACLE_SERVIVE_NAME')

results = pd.read_sql('select * from table_name', engine,chunk_size= 5000)
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
for result in results:
     result.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
                        table_schema=schema, if_exists=if_exists)

you can delete the chunk_size argument if you want to load the data as one chunk but this might consume the memory if the table is big



results = pd.read_sql('select * from table_name')
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
results.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
                        table_schema=schema, if_exists=if_exists)

6
guillaume blaquiere On

My recommendation is to extract the Oracle table content in files (CSV format for example). Copy the file into Cloud Storage. Then you load them into BigQuery.

Dataflow is useless (expensive, less efficient, take more time) if the transformation that you want to do can be done is SQL.

However, if you need to request external API (for data transformation, such as ML API for example) or if you want to sink the data in another database than BigQuery (Firestore, BigTable, Cloud SQL,...), dataflow is the right tool

EDIT

To go deeper in detail, I assume that the tables are in the same dataset. Then, the code is simple

def hello_gcs_generic(data, context):

    client = bigquery.Client()
    dataset_id = 'my_dataset'

    bucket = data['bucket']
    path = data['name']

    table_name = path[path.rfind('/')+1:path.rfind(('.'))]

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        skip_leading_rows=1,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="date"  # Name of the column to use for partitioning.
        ),
        source_format=bigquery.SourceFormat.CSV
    )

    uri = "gs://{}/{}".format(bucket,path)

    load_job = client.load_table_from_uri(
        uri, dataset_ref.table(table_name), job_config=job_config
    )  # API request
    print("Starting job {}".format(load_job.job_id))
    
    load_job.result()  # Waits for table load to complete.
    print("Job finished.")

here the Cloud Functions is called on each file drops in the bucket. Therefore, if you drop 300 file in the same time, 300 functions will be triggered and the process will be performed in parallel.

Several points:

  • The Table name is equal to the file name.
  • By default, the behavior is a write-append (add the data to the table.
  • I strongly recommend you to partition your table on a field, here a date field.
  • The load job is often quick. If it take more than 9 minutes, use Cloud Run (limit to 15 minutes) or don't wait the end of the load ( delete this load_job.result())
  • The CSV start with 1 leading row. Not mandatory but in case of schema auto-detection, that help for naming the column nicely.

note: I assume that all the file dropped into Cloud Storage have to be integrated into BigQuery. If not, you can add filter as describe in one of my articles