I am using the Salesforce source connector for Airbyte to ingest a report from Salesforce. The ingestion itself can be done with a single Docker command (tested locally and it works):
docker run --rm -v $HOME/internal-reporting/airbyte/airbyte-integrations/connectors/source-salesforce/secrets:/secrets -v $HOME/internal-reporting/airbyte/airbyte-integrations/connectors/source-salesforce/integration_tests:/integration_tests airbyte/source-salesforce read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
The output of the command is an unformatted JSON. I am trying then use Python to apply some light transformations to the output, put in a Pandas dataframe, and load it into a Lakehouse table using Spark. I would like to use Airflow to orchestrate the entire process from running the Docker command to writing the DF to the table. I believe the way to do this is with a KubernetesPodOperator
, but I am unsure of how to mount the secrets/config.json
and the integration_tests/configured_catalog.json
files to the Kubernetes pod and package the command into the KubernetesPodOperator
, and then subsequently pass the output of the Docker command to Python.
Here is the relevant code snippet from what I have so far:
def get_salesforce_data(raw_data):
logging.info("Requesting Salesforce Opportunity Report.")
opportunity_schema = psType.StructType(
[
psType.StructField("Id", psType.StringType(), True),
# ... rest of schema ...
psType.StructField("End_Date__c", psType.ByteType(), True),
]
)
df = pd.DataFrame()
df = pd.read_json(raw_data, lines=True)
record_df = df.loc[(df["type"] == "RECORD")]
opp_df = pd.DataFrame(record_df["record"].values.tolist())
report_df = pd.DataFrame(opp_df["data"].values.tolist())
# write the Dataframe to the lakehouse
logger = logging.getLogger(__name__)
output = subprocess.run(["hostname", "-i"], capture_output=True) # nosec
hostname = output.stdout.decode().strip()
spark = ( # noqa
SparkSession.builder.config("spark.driver.host", hostname)
.config("spark.executor.instances", 2)
.config(
"spark.kubernetes.container.image",
f"{os.getenv('SPARK_IMAGE')}:{os.getenv('SPARK_IMAGE_TAG')}",
)
.getOrCreate()
)
spark.sql("create database if not exists reporting")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark_df = spark.createDataFrame(report_df)
spark_df.write.format("delta").mode("append").saveAsTable("reporting.opportunities")
logger.info("Table successfully written.")
docker_command = [
"docker",
"run",
"--rm",
"-v",
"/project/airflow/source-salesforce/secrets:/secrets",
"-v /project/airflow/source-salesforce/integration_tests:/integration_tests",
"/project/airflow/source-salesforce",
"read",
"--config",
"/secrets/config.json",
"--catalog",
"/integration_tests/configured_catalog.json",
"-it",
"salesforce_ingest",
]
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email":
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(seconds=30),
}
with DAG(
# DAG details
) as dag:
# Get data from Salesforce
docker_ingest = KubernetesPodOperator(
task_id=f"ingest_salesforce",
namespace="airflow",
name="salesforce_ingest",
image="airbyte/source-salesforce:latest",
cmds=docker_command,
dag=dag,
)
ingest_salesforce = PySparkOperator(
task_id=f"load_opportunity_report",
python_callable=get_salesforce_data,
num_executors=2,
op_args=[docker_ingest.output],
)
# assign task dependencies
docker_ingest >> ingest_salesforce
The error I currently receive is:
unable to start container process: exec: "docker": executable file not found in $PATH: unknown
I believe that I'm not using the Docker command correctly in the KubePodOperator
and that I should mount the secrets and catalog as volumes, and then the actual cmd
operator should just be read
with the rest of the arguments in args
. I'm also not sure how to translate the paths on my local machine to Git paths. Any suggestions would be appreciated, thank you!