Azure Data Factory, Batch Service, Python: Unable to append blob with csv chunks

104 Views Asked by At

Azure Data Facory pipeline runs a Custom activity with Batch Service as a linked service, which holds Python code in it. The transformation was built for running locally, and I want to make it work on Azure, saving blobs (csv files) into Azure Storage (Az Data Lake).

The transformation runs on chunks, and performed within a for loop.

for i, in_chunk in enumerate(pd.read_csv(source, chunksize=6000, sep=";")
    # transformation happens and spits out out_chunk parameter 
    # holding the 6000 rows out of the entire file
    kwargs = {'mode':'a', 'header': False} if i>0 else {}
    out_chunk.to_csv(cdl_file, sep="|", index=False, **kwargs)

After this, I tried different approaches as it is written in this question and answer for example: My original question for another issue

The solution written in the question and the answer above didn't throw errors, but it didn't store the entire file, only the 6000 rows specified as a chunk.

What am I doing wrong? I do not understand how should this be handled.

EDIT: As requested by JayashankarGS, I added the code I tried and the screenshot about what happened.

def transform_data(v, outputType, eventType, fileName, config, source, conn_string, dl, encoding, in_dtypes = None,  chunkSize=10000, in_sep = ";"):
folderName = 'temp'
containerName = 'input'
outputBlobName = folderName + "/" + fileName
inputBlobPath = containerName + "/" + outputBlobName
blob = BlobClient.from_connection_string(conn_str=conn_string, container_name=containerName, blob_name=outputBlobName)

credential = {'connection_string': conn_string}
accountName = conn_string.split("AccountName=")[1].split(";")[0] 

adls_path = 'abfs://' + containerName + '@' + accountName + '.dfs.core.windows.net/' + outputBlobName

template = pd.DataFrame(columns = v.headers[outputType])
transformationSchema = config[outputType + "_out_transformations"]

logging.info('Reading data chunk...')
for i, in_chunk in enumerate(pd.read_csv(source,
            chunksize = chunkSize,
            sep=in_sep,
            encoding = 'unicode_escape',
            dtype=in_dtypes)):
    logging.info('Handle duplicates and missing fields...')

    in_chunk.fillna('', inplace=True) 
    out_chunk = template.copy()

    out_chunk.fillna('', inplace=True)

    out_chunk.drop_duplicates(subset=[config["composite_key"]["key_partA"], config["composite_key"]["key_partB"], config["composite_key"]["key_partC"]],inplace=True)

    logging.info('Start data transformation for schema: ' + outputType)

    for name, spec in transformationSchema.items():
        out_chunk[name] = transform_column(in_chunk, spec)

    kwargs = {'mode': 'a', 'header': False}

    logging.info('Transformation was successful for ' + outputType)
    dateTime = time.strftime("%Y%m%d%H%M%S")

    if not os.path.exists(containerName + "/" + folderName + "/"):
            os.mkdir(containerName)
            os.mkdir(containerName + "/" + folderName + "/")

    print(f"Uploading chunk: {len(out_chunk)}")
    logging.info('Trying to store transformed file in Azure Storage...')

    out_chunk.to_csv(adls_path, storage_options=credential, sep=dl, index=False, **kwargs)

The result of this is two files generated and stored in Azure Storage. As you can see in the results of running this with Batch Service on Azure Data Factory, it processes 10000 rows as given batch size, and then tries to do the same for the second file. The "file not found" error comes from a step after transformation which is a validator (ignore that warning!).

enter image description here

1

There are 1 best solutions below

13
On

The reason is having different blob types.

In your code:

adls_path = 'abfs://[email protected]/outfromVS.csv'
credentials = {'connection_string': blob_conn}

for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500, sep=";")):
    kwargs = {'mode': 'a', 'header': False} if i > 0 else {}
    print(in_chunk.count())
    in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)

For the first iteration, you are not passing the mode; by default, this makes the file of type Block blob.

enter image description here

When you are writing to storage, the BlobType should be the same.

So, just give the mode from the start: kwargs = {'mode': 'a', 'header': False}

Below is the count of my data:

enter image description here

Now uploading with a chunk size of 500:

for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500)):
    kwargs = {'mode': 'a', 'header': False}
    print(f"Uploading chunk: {len(in_chunk)}")
    in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)

Output: enter image description here

and

enter image description here

Again, getting the file:

enter image description here

Note: Make sure the output file is not present in the storage account when you are running it first time, if it is present, it should be of Append blob type.