How can i run my apache beam pipeline with a local CSV-File when using Tensorflow Extended?

235 Views Asked by At

The csv data has to be decoded before it can be passed to TFT and Beams TextIO doesnt work stand-alone so how does it work?

1

There are 1 best solutions below

0
On
def beam():   
output_path = "./output"
options = PipelineOptions()
options.view_as(StandardOptions).runner = "DirectRunner"

with beam.Pipeline(options=options) as pipeline:
    
    with tft_beam.Context(temp_dir="./tmp"):
        
        converter = tft.coders.CsvCoder(CSV, raw_metadata.schema)
        raw_data = (
              pipeline
                | 'ReadTrainData' >> beam.io.ReadFromText(input_path,skip_header_lines=1)
                | 'FixCommasTrainData' >> beam.Map(
                      lambda line: line.replace(', ', ','))
                | 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))
        

raw_metadata = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(feature_specs))

See more in the official tfx guide.