I am following this "get started" tensorflow tutorial on how to run tfdv on apache beam on google cloud dataflow. My code is very similar to the one in the tutorial:

import tensorflow_data_validation as tfdv
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions

PROJECT_ID = 'my-project-id'
JOB_NAME = 'my-job-name'
REGION = "europe-west3"
NETWORK = "regions/europe-west3/subnetworks/mysubnet"
GCS_STAGING_LOCATION = 'gs://my-bucket/staging'
GCS_TMP_LOCATION = 'gs://my-bucket/tmp'
GCS_DATA_LOCATION = 'gs://another-bucket/my-data.CSV'
# GCS_STATS_OUTPUT_PATH is the file path to which to output the data statistics
# result.
GCS_STATS_OUTPUT_PATH = 'gs://my-bucket/stats'

# downloaded locally with: pip download tensorflow_data_validation --no-deps --platform manylinux2010_x86_64 --only-binary=:all:
#(would be great to use it have it on cloud storage) PATH_TO_WHL_FILE = 'gs://my-bucket/wheels/tensorflow_data_validation-1.7.0-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl'
PATH_TO_WHL_FILE = '/Users/myuser/some-folder/tensorflow_data_validation-1.7.0-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl'


# Create and set your PipelineOptions.
options = PipelineOptions()

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
google_cloud_options.region = REGION
options.view_as(StandardOptions).runner = 'DataflowRunner'

setup_options = options.view_as(SetupOptions)
# PATH_TO_WHL_FILE should point to the downloaded tfdv wheel file.
setup_options.extra_packages = [PATH_TO_WHL_FILE]

# Worker options
worker_options = options.view_as(WorkerOptions)
worker_options.subnetwork = NETWORK
worker_options.max_num_workers = 2

print("Generating stats...")
tfdv.generate_statistics_from_tfrecord(GCS_DATA_LOCATION, output_path=GCS_STATS_OUTPUT_PATH, pipeline_options=options)
print("Stats generated!")

The code above starts a dataflow job but unfortunately it fails with the following error:

apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py", line 285, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 462, in find_class
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'NumExamplesStatsGenerator' on <module 'tensorflow_data_validation.statistics.stats_impl' from '/usr/local/lib/python3.8/site-packages/tensorflow_data_validation/statistics/stats_impl.py'>

I couldn't find on the internet anything similar. If it can help, on my local machine (MACOS) I have the following versions:

Apache Beam version: 2.34.0 Tensorflow version: 2.6.2 TensorFlow Transform version: 1.4.0 TFDV version: 1.4.0

Apache beam on cloud runs with Apache Beam Python 3.8 SDK 2.34.0

BONUS QUESTION: Another question I have is around the PATH_TO_WHL_FILE. I tried to put it on a storage bucket but Beam doesn't seem to be able to pick it up. Only locally, which is actually a problem, because it would make it more difficult to distribute this code. What would be a good practice to distribute this wheel file?

1

There are 1 best solutions below

4
On BEST ANSWER

Based on the name of the attribute NumExamplesStatsGenerator, it's a generator that is not pickle-able.

But I couldn't find the attribute from the module now. A search indicates that in 1.4.0 this module contains this attribute. So you may want to try a newer versioned TFDV.

PATH_TO_WHL_FILE indicates a file to stage/distribute to Dataflow for execution, so you can use a file on GCS.