IgnoreMissingFiles option not working in certain scenarios

408 Views Asked by At

I have a weird problem and I can't seem to find the reason for it.

Goal: Have the spark.sql.files.ignoreMissingFiles in spark structured streaming when using dbx by databricks labs (essentially it is spark code). I have some files for whom notifications has been received, but are now missing from storage. This can be easily simulated in databricks by

  1. first running a autoloader job (in the batch mode with availableNow=true)
  2. click on the "test connection" button on the external location (on which the streaming job relies on).
  3. run the autoloader again

Without ignoreMissingFiles, it would fail because of missing some "validate_credential" file.

I can get it to work in a notebook by either setting the data source option of ignoreMissingFiles or the cluster config of spark.sql.files.ignoreMissingFiles.

E.g this works:

(
    spark.readStream
    .format("cloudFiles")
    .options(**cloudFilesOptions)
    .option("ignoreMissingFiles",True)
    .schema(input_schema)
    .load(source_path)
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .toTable("test_autoloader2")
)

In addition both the following workarounds also work

# inside the notebook
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")

or setting in the spark config of the cluster.

However, when working with DBX, the same does not work. I have seen that it only works for options starting with cloudfiles but not the other ones. This means it does not work with normal spark config, but only with databricks autoloader specific options.

e.g. this does not work (after clicking on the test connection button, because it still complains of missing files)

def _process_timeseries(
        self,
    ) -> None:
        """Ingests the measurements data into the bronze (raw) layer."""
        # Read the input data
        self.spark.sparkContext._conf.set("spark.sql.files.ignoreMissingFiles","true")

        df = (
            self.spark.readStream.format("cloudFiles")
            .schema(ext_schema)
            .option("cloudFiles.format", "json")
            .option(
                "cloudFiles.schemaLocation",
                f"{self.checkpoint_path}/{self.catalog_name}/etl_bronze_timeseries{self.postfix}",
            )
            .option("cloudFiles.useNotifications", "true")
            .option("cloudFiles.subscriptionId", self.sub_id)
            .option("cloudFiles.tenantId", self.tenant_id)
            .option("cloudFiles.resourceGroup", self.rg)
            .option(
                "cloudFiles.clientSecret",
                self.dbutils.secrets.get(scope="keyvault_secrets", key="databricks-autoloader-clientsecret"),
            )
            .option("cloudFiles.clientId", self.client_id)
            .option("cloudFiles.backfillInterval", "1 day")
            .option("ignoreMissingFiles", true)
            .load(f"{self.input_path}/")

        )

This method (defined above) is a part of a class that is derived from the Task class defined in DBX i.e. the class and its init methods

class ETLBronzeTimeseries(Task):
    def __init__(self, spark=None, source_container=None, postfix=""):
    super().__init__(spark)
    self.input_path = someval
    self.postfix = someval

    self.checkpoint_path = self.spark.conf.get("checkpoint_path")
    ....other stuffs

the super().__init__(spark) is where the sparkSession is returned (defined within dbx), in the Task class using

@staticmethod
def _prepare_spark(spark) -> SparkSession:
    if not spark:
        return SparkSession.builder.getOrCreate()
    else:
        return spark

I wondered if the spark session is somehow not getting updated, so I also tried changing it to

return SparkSession.builder.config("spark.sql.files.ignoreMissingFiles","true").getOrCreate()

but that too didn;t work.

I also tried changing the cluster used by the job to set the config there but that didn't work, and I really can't figure out why, because the cluster configs should be read.

I suspect that the SparkSession isn't getting updated and I few other places, I noticed that maybe the associated spark context needs to be stopped, and then recreated but the thing is: I changed the sparksession creation (mentioned above) to take that config into consideration.

What am I missing?

the error I keep getting is:

com.databricks.sql.io.FileReadException: Error while reading file abfss:[email protected]/validate_credential_2023-09-20-06-44-32. [CLOUD_FILE_SOURCE_FILE_NOT_FOUND] A file notification was received for file: abfss://[email protected]/validate_credential_2023-09-20-06-44-32 but it does not exist anymore. Please ensure that files are not deleted before they are processed. To continue your stream, you can set the Spark SQL configuration spark.sql.files.ignoreMissingFiles to true.

And of course I do not want to use a notebook for production grade code.

DBx: here

Funny thing is: when setting .option("ignoreMissingFiles", true), and going to spark UI, for the failed stage, I can see the dataframe properties as (and yet it does not recognize it).

enter image description here

1

There are 1 best solutions below

2
Marvin Schenkel On

I have just spent the entire morning bumping my head on this issue and I think I have figured it out. The reason these 'validate_credential_xx' files show up is because Unity Catalog seems to use this file to verify it can access the storage container when you configure an external location object to it. After verification, it will delete the file again.

When you have Databricks Autoloader configured, it will have deployed an Event Grid + Storage Queue to monitor any incoming files in your storage container. So what happens is:

  1. Unity Catalog creates a file in the container to verify it can access it
  2. The event grid fires an event that a file named 'validate_credential_xx' has been created
  3. The message ends up in the storage queue to be processed by Databricks Autoloader.
  4. Unity Catalog deletes the file 'validate_credential_xxx' from the container

Now when you start a job using Databricks Autoloader, it will look in the queue for any files to process. It finds a message saying that it needs to load 'validate_credential_xxx', but it cannot find that file in the container. This results in the error you are seeing.

I solved this by adding an extra filter to the event grid subscription where 'subject' may not begin with '/blobServices/default/containers/"your container"/blobs/validate_credential_'. This prevents messages from being sent to the queue for these kind of files.

Please note that after the initial Databricks failure, the item is removed from the queue but still stored in the RocksDB of your checkpoint location. This explains why the error keeps popping up after it has been removed from the queue.