I am using the Salesforce source connector for Airbyte to ingest a report from Salesforce. The ingestion itself can be done with a single Docker command (tested locally and it works):

docker run --rm -v $HOME/internal-reporting/airbyte/airbyte-integrations/connectors/source-salesforce/secrets:/secrets -v $HOME/internal-reporting/airbyte/airbyte-integrations/connectors/source-salesforce/integration_tests:/integration_tests airbyte/source-salesforce read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json

The output of the command is an unformatted JSON. I am trying then use Python to apply some light transformations to the output, put in a Pandas dataframe, and load it into a Lakehouse table using Spark. I would like to use Airflow to orchestrate the entire process from running the Docker command to writing the DF to the table. I believe the way to do this is with a KubernetesPodOperator, but I am unsure of how to mount the secrets/config.json and the integration_tests/configured_catalog.json files to the Kubernetes pod and package the command into the KubernetesPodOperator, and then subsequently pass the output of the Docker command to Python.

Here is the relevant code snippet from what I have so far:

def get_salesforce_data(raw_data):
    logging.info("Requesting Salesforce Opportunity Report.")

    opportunity_schema = psType.StructType(
        [
            psType.StructField("Id", psType.StringType(), True),
            # ... rest of schema ...
            psType.StructField("End_Date__c", psType.ByteType(), True),
        ]
    )

    df = pd.DataFrame()
    df = pd.read_json(raw_data, lines=True)

    record_df = df.loc[(df["type"] == "RECORD")]

    opp_df = pd.DataFrame(record_df["record"].values.tolist())

    report_df = pd.DataFrame(opp_df["data"].values.tolist())

    # write the Dataframe to the lakehouse

    logger = logging.getLogger(__name__)

    output = subprocess.run(["hostname", "-i"], capture_output=True)  # nosec
    hostname = output.stdout.decode().strip()
    spark = (  # noqa
        SparkSession.builder.config("spark.driver.host", hostname)
        .config("spark.executor.instances", 2)
        .config(
            "spark.kubernetes.container.image",
            f"{os.getenv('SPARK_IMAGE')}:{os.getenv('SPARK_IMAGE_TAG')}",
        )
        .getOrCreate()
    )

    spark.sql("create database if not exists reporting")

    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
    spark_df = spark.createDataFrame(report_df)
    spark_df.write.format("delta").mode("append").saveAsTable("reporting.opportunities")

    logger.info("Table successfully written.")

docker_command = [
        "docker",
        "run",
        "--rm",
        "-v",
        "/project/airflow/source-salesforce/secrets:/secrets",
        "-v /project/airflow/source-salesforce/integration_tests:/integration_tests",
        "/project/airflow/source-salesforce",
        "read",
        "--config",
        "/secrets/config.json",
        "--catalog",
        "/integration_tests/configured_catalog.json",
        "-it",
        "salesforce_ingest",
    ]

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email": 
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
}
with DAG(
    # DAG details
) as dag:
    # Get data from Salesforce
    docker_ingest = KubernetesPodOperator(
        task_id=f"ingest_salesforce",
        namespace="airflow",
        name="salesforce_ingest",
        image="airbyte/source-salesforce:latest",
        cmds=docker_command,
        dag=dag,
    )

    ingest_salesforce = PySparkOperator(
        task_id=f"load_opportunity_report",
        python_callable=get_salesforce_data,
        num_executors=2,
        op_args=[docker_ingest.output],
    )

    # assign task dependencies

    docker_ingest >> ingest_salesforce

The error I currently receive is: unable to start container process: exec: "docker": executable file not found in $PATH: unknown

I believe that I'm not using the Docker command correctly in the KubePodOperator and that I should mount the secrets and catalog as volumes, and then the actual cmd operator should just be read with the rest of the arguments in args. I'm also not sure how to translate the paths on my local machine to Git paths. Any suggestions would be appreciated, thank you!

0

There are 0 best solutions below