How can I ingest an Excel spreadsheet with multiple tabs?

702 Views Asked by At

I would like to ingest Excel files in a remote folder or a SFTP. It works with CSV files but not XLS or XLSX files.

1

There are 1 best solutions below

2
On

The code below provides functions to transform an xls/xlsx file into a Spark dataframe.

To use these functions you need to:

  1. Copy paste the functions below to your repository (in a utils.py file for instance)
  2. Create a new transform script
  3. In the transform script, copy/paste the example transform and modify the parameters.

Example transform to use the functions:

# Parameters for Excel files with multiple tabs ingestion 
SHEETS_PARAMETERS = {
    # Each of these blocks will take one tab of your Excel file ("Artists" here) and write from "header" a dataset in the path provided "/Studio/studio_datasource/artists"
    "Artists": {
        "output_dataset_path": "/Studio/studio_datasource/artists",
        "header": 7
    },
    "Records": {
        "output_dataset_path": "/Studio/studio_datasource/records",
        "header": 0
    },
    "Albums": {
        "output_dataset_path": "/Studio/studio_datasource/albums",
        "header": 1
    }
}

# Define the dictionary of outputs needed by the transform's decorator
outputs = {
    sheet_parameter["output_dataset_path"]: Output(sheet_parameter["output_dataset_path"])
    for sheet_parameter in SHEETS_PARAMETERS.values()
}
@transform(
    my_input=Input("/Studio/studio_datasource/excel_file"),
    **outputs
)
def my_compute_function(my_input, ctx, **outputs):
    # Add the output objects to the parameters
    for sheetname, parameters in SHEETS_PARAMETERS.items():
        output_dataset_path = SHEETS_PARAMETERS[sheetname]["output_dataset_path"]
        SHEETS_PARAMETERS[sheetname]["output_dataset"] = outputs[output_dataset_path]

    # Transform the sheets to datasets
    write_datasets_from_excel_sheets(my_input, SHEETS_PARAMETERS, ctx)

Functions:

import pandas as pd
import tempfile
import shutil

def normalize_column_name(cn):
    """
    Remove forbidden characters from the columns names
    """
    invalid_chars = " ,;{}()\n\t="
    for c in invalid_chars:
        cn = cn.replace(c, "_")
    return cn

def get_dataframe_from_excel_sheet(fp, ctx, sheet_name, header):
    """
    Generate a Spark dataframe from a sheet in an excel file available in Foundry
    Arguments:
        fp:
            TemporaryFile object that allows to read to the file that contains the Excel file
        ctx:
            Context object available in a transform
        sheet_name:
            Name of the sheet
        header:
            Row (0-indexed) to use for the column labels of the parsed DataFrame.
            If a list of integers is passed those row positions will be combined into a MultiIndex.
            Use None if there is no header.
    """
    # Using UTF-8 encoding is safer
    dataframe = pd.read_excel(
        fp,
        sheet_name,
        header=header,
        encoding="utf-8"
    )

    # Cast all the dataframes as string
    dataframe = dataframe.applymap(unicode)

    # Create a dataframe in Spark from the pandas dataframe
    dataframe = ctx.spark_session.createDataFrame(dataframe)

    # Remove invalid caracters in the column names (these are Foundry restrictions)
    columns = dataframe.columns
    for col in columns:
        dataframe = dataframe.withColumnRenamed(col, normalize_column_name(col))
    return dataframe

def write_datasets_from_excel_sheets(input_dataframe, sheet_parameters, ctx):
    """
    Generate a Spark dataframe from a sheet in an excel file available in Foundry
    Arguments:
        input_dataframe:
            Input resource which contains the Excel file
        sheet_parameters:
            Dictionary containing parameters to parse a sheet. A key corresponds to a tab in the Excel spreadsheet, the value provides parameters for this tab. In the example below, "Artists" is the name of a tab, the script will read the headers in line number 7 and write the output to /Studio/studio_datasource/artists.
            e.g.
                {
                    "Artists": {
                        "output_dataset_path": "/Studio/studio_datasource/artists",
                        "header": 7
                    },
                    "Records": {
                        "output_dataset_path": "/Studio/studio_datasource/records",
                        "header": 0
                    },
                    "Albums": {
                        "output_dataset_path": "/Studio/studio_datasource/albulms",
                        "header": 1
                    }
                }
        ctx:
            Transform context
    e.g.
        # Parameters for Excel files with multiple tabs ingestion 
        SHEETS_PARAMETERS = {
            "Artists": {
                "output_dataset_path": "/Studio/studio_datasource/artists",
                "header": 7
            },
            "Records": {
                "output_dataset_path": "/Studio/studio_datasource/records",
                "header": 0
            },
            "Albums": {
                "output_dataset_path": "/Studio/studio_datasource/albums",
                "header": 1
            }
        }

        # Define the dictionnary of outputs needed by the transform's decorator
        outputs = {
            sheet_parameter["output_dataset_path"]: Output(sheet_parameter["output_dataset_path"])
            for sheet_parameter in SHEETS_PARAMETERS.values()
        }
        @transform(
            my_input=Input("/Studio/studio_datasource/excel_file"),
            **outputs
        )
        def my_compute_function(my_input, ctx, **outputs):
            # Add the output objects to the parameters
            for sheetname, parameters in SHEETS_PARAMETERS.items():
                output_dataset_path = SHEETS_PARAMETERS[sheetname]["output_dataset_path"]
                SHEETS_PARAMETERS[sheetname]["output_dataset"] = outputs[output_dataset_path]

            # Transform the sheets to datasets
            write_datasets_from_excel_sheets(my_input, SHEETS_PARAMETERS, ctx)
    """

    # Open the excel file
    files = [f for f in input_dataframe.filesystem().ls()]
    file_path = files[0].path

    with input_dataframe.filesystem().open(file_path, 'rb') as f:
        # Read the input file from Foundry File System
        fp = tempfile.TemporaryFile()
        shutil.copyfileobj(f, fp)
        fp.seek(0)

        # For each sheet in the excel file, create a dataframe and write it
        for sheet_name, sheet_parameter in sheet_parameters.items():
            # Generate dataframe from a single sheet in the excel file
            dataframe = get_dataframe_from_excel_sheet(fp, ctx, sheet_name, sheet_parameter["header"])
            # Write dataset
            sheet_parameter["output_dataset"].write_dataframe(dataframe)

        # Close the file
        fp.close()