Op type not registered \'IO>BigQueryClient\' with BigQuery connector on AI platform

313 Views Asked by At

I'm trying to parallelize the training step of my model with tensorflow ParameterServerStrategy. I work with GCP AI Platform to create the cluster and launch the task. As my dataset is huge, I use the bigquery tensorflow connector included in tensorflow-io.

My script is inspired by the documentation of tensorflow bigquery reader and the documentation of tensorflow ParameterServerStrategy

Locally my script works well but when I launch it with AI Platform I get the following error :

{"created":"@1633444428.903993309","description":"Error received from peer ipv4:10.46.92.135:2222","file":"external/com_github_grpc_grpc/src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Op type not registered \'IO>BigQueryClient\' in binary running on gke-cml-1005-141531--n1-standard-16-2-644bc3f8-7h8p. Make sure the Op and Kernel are registered in the binary running in this process. Note that if you are loading a saved graph which used ops from tf.contrib, accessing (e.g.) `tf.contrib.resampler` should be done before importing the graph, as contrib ops are lazily registered when the module is first accessed.","grpc_status":5}

The scripts works with fake data on AI platform and works locally with bigquery connector. I imagine that the compilation of the model including the bigquery connector and its calls on other devices creates the bug but I don't know how to fix it.

I read this error happens when devices don't have same tensorflow versions so I checked tensorflow and tensorflow-io version on each device.

tensorflow : 2.5.0

tensorflow-io : 0.19.1

I created a similar example which reproduce the bug on AI platform

import os
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
import tensorflow as tf

import multiprocessing
import portpicker
from tensorflow.keras.layers.experimental import preprocessing

from google.cloud import bigquery

from tensorflow.python.framework import dtypes

import numpy as np
import pandas as pd

client = bigquery.Client()

PROJECT_ID = <your_project>
DATASET_ID = 'tmp'
TABLE_ID = 'bq_tf_io'

BATCH_SIZE = 32

# Bigquery requirements
def init_bq_table():
    table = '%s.%s.%s' %(PROJECT_ID, DATASET_ID, TABLE_ID)
    # Create toy_data
    def create_toy_data(N):
        x = np.random.random(size = N)
        y = 0.2 + x + np.random.normal(loc=0, scale = 0.3, size = N)
        return x, y
    x, y =create_toy_data(1000)
    df = pd.DataFrame(data = {'x': x, 'y': y})

    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE",)
    job = client.load_table_from_dataframe( df, table, job_config=job_config )
    job.result()

# Create initial data
#init_bq_table()

CSV_SCHEMA = [
      bigquery.SchemaField("x", "FLOAT64"),
      bigquery.SchemaField("y", "FLOAT64"),
  ]

def transform_row(row_dict):
  # Trim all string tensors
  dataset_x = row_dict
  dataset_x['constant'] = tf.cast(1, tf.float64)
  # Extract feature column
  dataset_y = dataset_x.pop('y')

  #Export as tensor
  dataset_x = tf.stack([dataset_x[column] for column in dataset_x], axis=-1)

  return (dataset_x, dataset_y)

def read_bigquery(table_name):
  tensorflow_io_bigquery_client = BigQueryClient()
  read_session = tensorflow_io_bigquery_client.read_session(
      "projects/" + PROJECT_ID,
      PROJECT_ID, TABLE_ID, DATASET_ID,
      list(field.name for field in CSV_SCHEMA),
      list(dtypes.double if field.field_type == 'FLOAT64'
           else dtypes.string for field in CSV_SCHEMA),
      requested_streams=2)

  dataset = read_session.parallel_read_rows()
  return dataset

def get_data():
    dataset = read_bigquery(TABLE_ID)
    dataset = dataset.map(transform_row, num_parallel_calls=4)
    dataset = dataset.batch(BATCH_SIZE).prefetch(2)
    return dataset

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

# parameter server and worker just wait jobs from the coordinator (chief)
if cluster_resolver.task_type in ("worker"):
    worker_config = tf.compat.v1.ConfigProto()

    server = tf.distribute.Server(
        cluster_resolver.cluster_spec(),
        job_name=cluster_resolver.task_type,
        task_index=cluster_resolver.task_id,
        config=worker_config,
        protocol="grpc")
    server.join()

elif cluster_resolver.task_type in ("ps"):
    server = tf.distribute.Server(
        cluster_resolver.cluster_spec(),
        job_name=cluster_resolver.task_type,
        task_index=cluster_resolver.task_id,
        protocol="grpc")
    server.join()

elif cluster_resolver.task_type == 'chief':
    strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver=cluster_resolver)

if cluster_resolver.task_type == 'chief':

    learning_rate = 0.01
    with strategy.scope():
        # model
        model_input = tf.keras.layers.Input(
            shape=(2,), dtype=tf.float64)
        layer_1 = tf.keras.layers.Dense( 8, activation='relu')(model_input)
        dense_output = tf.keras.layers.Dense(1)(layer_1)
        model = tf.keras.Model(model_input, dense_output)

        #optimizer
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate)

        accuracy = tf.keras.metrics.MeanSquaredError()

    @tf.function
    def distributed_train_step(iterator):
        def train_step(x_batch_train, y_batch_train):
            with tf.GradientTape() as tape:
                y_predict = model(x_batch_train, training=True)
                loss_value = tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)(y_batch_train, y_predict)
                grads = tape.gradient(loss_value, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_weights))
            accuracy.update_state(y_batch_train, y_predict)
            return loss_value
        x_batch_train, y_batch_train = next(iterator)
        return strategy.run(train_step, args=(x_batch_train, y_batch_train))

    coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)


    #test
    def dataset_fn(_):
        def create_toy_data(N):
            x = np.random.random(size = N)
            y = 0.2 + x + np.random.normal(loc=0, scale = 0.3, size = N)
            return np.c_[x,y]
        def toy_transform_row(row):
            dataset_x = tf.stack([row[0], tf.cast(1, tf.float64)], axis=-1)
            dataset_y = row[1]
            return dataset_x, dataset_y
        N = 1000
        data =create_toy_data(N)
        dataset = tf.data.Dataset.from_tensor_slices(data)
        dataset = dataset.map(toy_transform_row, num_parallel_calls=4)
        dataset = dataset.batch(BATCH_SIZE)
        dataset = dataset.prefetch(2)
        return dataset

    @tf.function
    def per_worker_dataset_fn():
        return strategy.distribute_datasets_from_function(lambda x : get_data()) # <-- Not working with AI platform
        #return strategy.distribute_datasets_from_function(dataset_fn) # <-- Working with AI platform

    per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)

    # Train model
    for epoch in range(5):
        per_worker_iterator = iter(per_worker_dataset)
        accuracy.reset_states()
        for step in range(5):
            coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
        coordinator.join()
        print ("Finished epoch %d, accuracy is %f." % (epoch, accuracy.result().numpy()))

When I create the dataset with per_worker_dataset_fn() I can use the bigquery connector (bugging) or create the dataset in live (working).

AI Platform Cluster configuration :

runtimeVersion: "2.5"

pythonVersion: "3.7"

Did someone get this issue ? Bigquery connector worked pretty well with MirroredStrategy on AI Platform. Tell me if I should report the issue somewhere else.

2

There are 2 best solutions below

3
On BEST ANSWER

I think this is due to lazy loading of libtensorflow_io.so. https://github.com/tensorflow/io/commit/85d018ee59ceccfae06914ec2a2f6d6583775ff7

Can you try adding something like this to your code:

import tensorflow_io
tensorflow_io.experimental.oss()
1
On

As far as I understand this happens because when you submit your training job to Cloud AI training, it is using a stock TensorFlow 2.5 environment that doesn't have tensorflow-io package installed. Therefore it is complaining that it doesn't know about 'IO>BigQueryClient' op defined in tensorflow-io package.

Instead you can submit your training job to be using a custom container: https://cloud.google.com/ai-platform/training/docs/custom-containers-training

You don't need to write a new Docker file, you can use gcr.io/deeplearning-platform-release/tf-cpu.2-5 or gcr.io/deeplearning-platform-release/tf-gpu.2-5 (if your training job needs GPU) that has the right version of tensorflow-io installed.

You can read more about these containers here: https://cloud.google.com/tensorflow-enterprise/docs/use-with-deep-learning-containers

Here is my old example showing how to run a distributed training on Cloud AI using BigQueryReader: https://github.com/vlasenkoalexey/criteo/blob/master/scripts/train-cloud.sh

It is no longer maintained, but should give you a general idea how it should look like.