I am new to airflow, I want to schedule a job where the two tables from the different database records count will have to check whether it's matched or not. One source is GCP another one is Salesforce.
So I got found BigQueryOperator
to hit the query in GCP side and return the Count result but I couldn't find any operator which looks like SalesforceQueryOperator
which I can assign in an Airflow task.
So basically, I was talking about this which we can use to bring the count result:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)
I know that we can create a function, import library, create connection to Salesforce and run the query to bring the Count result but I don't want to follow this approach given below(a part of the code) which I already have tried.
def salesforcequery_count():
from simple_salesforce import Salesforce
import requests
session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
username='[email protected]', password='password', organizationId='OrgId',
session=session)
count_record = sf.query("SELECT count(id) FROM Contact")
// for row in data:
// process(row)
return 'count_record'
I want to create a custom operator that will look like SalesforceQueryOperator
and should work like BigQueryOperator
to hit the query in Salesforce table and bring the result.
Here is the reference: https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
Any help will be really appreciated.
You can use existing SalesforceHook to create your own custom operator.
Here is an example :
Then using it in your DAG :
Where
salesforce_default
is a connection that you add in AirFlow. You can see here how to add it: Salesforce Connection