How do I pass a TensorFlow Dataset through a TensorFlow Transform pipeline?

358 Views Asked by At

I have implemented a custom TensorFlow Dataset for my raw data. I can download, prepare, and load the data as a tensorflow.data.Dataset as follows:

import tensorflow_datasets

builder = tensorflow_datasets.builder("my_dataset")
builder.download_and_prepare()
ds = builder.as_dataset()

I want to transform this data in a TensorFlow Transform pipeline for model training. However, the only way I have been able to pass the dataset in to the transform pipeline is by converting it to instance dicts and passing in raw data metadata.

instance_dicts = tensorflow_datasets.as_dataframe(ds).to_dict(orient="records")
with tensorflow_transform.beam.Context():
    (transformed_data, _), transform_fn = (
        instance_dicts,
        RAW_DATA_METADATA,
    ) | tensorflow_transform.beam.AnalyzeAndTransformDataset(
        preprocessing_fn, output_record_batches=True
    )

Is there an easier and more memory efficient way of passing a TensorFlow Dataset to a TensorFlow Transform pipeline?

1

There are 1 best solutions below

0
Pierce Edmiston On

In this case, the easier and more memory efficient way to pass a Tensorflow Dataset to a Tensorflow Transform pipeline is to reference the TFRecord files written by the Tensorflow Dataset Builder's download_and_prepare() job.

import apache_beam
from apache_beam.io import tfrecordio

examples_dir = tensorflow_datasets.builder("my_dataset").info.data_dir
examples_file_pattern = f"{examples_dir}/my_dataset-*"

with apache_beam.Pipeline() as pipeline:
    with tensorflow_transform.beam.Context():
        raw = pipeline | tfrecordio.ReadFromTFRecord(file_pattern=examples_file_pattern)

To transform the raw data, create a TFXIO from a feature spec.

from tensorflow_transform.tf_metadata.schema_utils import schema_from_feature_spec
from tfxio.public import tfxio

example_spec = {
    "token": tensorflow.io.FixedLenFeature([], tensorflow.string),
    "label": tensorflow.io.FixedLenFeature([], tensorflow.string),
}
schema = schema_from_feature_spec(example_spec)
tfexample_tfxio = tfxio.TFExampleBeamRecord(physical_format=["tfrecord"], schema=schema)

Then in the pipeline, convert the PCollection to a Beam source, and provide the appropriate adapter to convert RecordBatch to Tensors.

        # ...
        (transformed_data, _), transform_fn = (
            (raw | tfexample_tfxio.BeamSource()),
            tfexample_tfxio.TensorAdapterConfig(),
        ) | tensorflow_transform.beam.AnalyzeAndTransformDataset(
            preprocessing_fn, output_record_batches=True
        )