Luigi task not writing pandas df to csv

561 Views Asked by At

I have the following code to simply an excel file and return only the required columns. It written as a luigi task containerized on docker and its not returning the csv file while _SUCCESS flag is being created.

Function Code:

def _save_datasets(simplified, outdir: Path, flag):
    out_clean = outdir / 'transformed.csv/'
    flag = outdir / flag
    simplified.to_csv(str(out_clean), index=False)
     # save as csv and create flag file
    flag.touch()

@click.command()
@click.option('--in-csv')
@click.option('--out-dir')
@click.option('--flag')
def transform_data(in_csv,out_dir, flag):
    out_dir = Path(out_dir)
    data=pd.read_csv(in_csv)
    req_dp = data[['description','points']]
 #simplifying the points according to range 
    def transform_points_simplified(points):
        if points < 84:
            return 1
        elif points >= 84 and points < 88:
            return 2 
        elif points >= 88 and points < 92:
            return 3 
        elif points >= 92 and points < 96:
            return 4 
        else:
            return 5
    simplified = req_dp.assign(points_simplified = dp['points'].apply(transform_points_simplified))
    _save_datasets(simplified,out_dir, flag)

Luigi Task code:

#Transform
class TransformData(DockerTask):
    """Task to simplify datasets"""

    in_path = '/usr/share/data/created_csv/'
    in_csv = luigi.Parameter(default= in_path + 'cleaned.csv')
    out_dir = luigi.Parameter(default='/usr/share/data/created_csv/')
    flag = luigi.Parameter('.SUCCESS_TransformData')

    @property
    def image(self):
        return f'code-chal/transform-data:{VERSION}'

    def requires(self):
        return CleanData()

    @property
    def command(self):
        return [
            'python', 'clean_data.py',
            '--in-csv', self.in_csv,
            '--out-dir', self.out_dir,
            '--flag', self.flag
        ]

    def output(self):
        return luigi.LocalTarget(
            path=str(Path(self.out_dir) / self.flag)
        )

The luigi task moves on to the next task due to the creation of _SUCCESS flag, but the next task fails since its dependent on the transformed.csv file which isn't being created.

Thanks

1

There are 1 best solutions below

0
On

In your LuigiTask you need a run function that needs to save the file you want using the output target from the output function.

So you need to add:

def run(self):
    outfile = open(self.output().path, 'wb') # Notice that it references to the path of the self.output function 
    transform_data(self.in_csv, outfile, self.flag)