Autolaoder - filenotification with modifiedBefore

85 Views Asked by At

With the current configuration, evey 1h we get new folder with new data in it.

See image here.

I'm leveraging file notification, and I prefer not to switch to directory listing. However, I encounter an issue with constantly updated CSV files in the latest folder. This causes job failures when the Autoloader attempts to read a CSV file being updated at that moment. I'm exploring ways to exclude the latest folder from being read and have come across the modifiedBefore parameter, but I'm uncertain about its compatibility with the FileNotification.

2

There are 2 best solutions below

0
JayashankarGS On

modifiedBefore is a generic option in autoloader, which can be used with file notification mode.

You mentioned that files arrive every hour and the latest file is updated very frequently, causing errors when you do incremental load using autoloader.

To avoid this, you can either provide a path with a pattern that matches all files except the latest one or use the modifiedBefore option.

In both cases, you should know the timestamp.

For example, if you don't need the data after 13:00:00, you can use patterns like below:

"/2023-12-20T(0[0-9]|1[0-2]).[0-9][0-9].[0-9][0-9]Z/"

For more information about patterns you can refer this documentation.

Or, you can use the modifiedBefore option:

autoloader_config = {
"cloudFiles.format":"csv",
"cloudFiles.useNotifications": "true",
"cloudFiles.resourceGroup":resourcegroup,
"cloudFiles.clientId": client_id,
"cloudFiles.clientSecret": client_secret,
"cloudFiles.tenantId": tenant_id,
"cloudFiles.connectionString":conn_string,
"cloudFiles.subscriptionId": subscription_id,
"cloudFiles.schemaLocation":schema_location,
"header": True,
"modifiedBefore":"2023-12-20 13:00:00.000000 UTC+5:30"
}

If you want to filter based on the last hour, you can use the following code to get that:

from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

india_dt = datetime.now(tz=ZoneInfo("Asia/Kolkata"))
filter  = india_dt -  timedelta(hours=1)

This gives: 2023-12-20 11:41:39.862054+05:30

You can then use that filter in the modifiedBefore option.

Note: You need to specify the zone that matches the folder names created every hour.

0
Kashyap On

If you're writing large files at some location, and there is a possibility that the job that reads these files might run some files might be incomplete, then you'll have to write some code / do some juggling in the job that reads the files.

Usually this problem is solved by changing the understanding with the job that writes the files, change it's logic so writing a new file is a two step process:

  1. Write file with to a separate folder in-progress/<final-file-name>
  2. Rename/move file from in-progress/<final-file-name> to staging/<final-file-name>.

In most cloud file systems renaming is an atomic operation so your reader will never be able to to read an incomplete file.

If your reader is looking for specific file suffixes/patterns then you can also use <final-file-name>.in-progress instead of a separate folder. Logic remains the same.