I want to create a CNN model with keras to classify images based on two views. Training data are given as batch dataset. Here below, you can find the code I have used but it does not work correctly:

import keras
import tensorflow as tf
import keras.layers as layers
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
'''

'''
#image width and height
img_width, img_height = 80, 80

#batch size
batch_size = 32


#import train and test data for both views:

#view 1
data_train_1 = tf.keras.preprocessing.image_dataset_from_directory('_view_1', labels='inferred', label_mode='int', class_names=None, color_mode='rgb', batch_size=batch_size, image_size=(img_width, img_height), shuffle=True, seed=123, validation_split=0.2, subset="training", interpolation='bilinear', follow_links=False)

data_validation_1 = tf.keras.preprocessing.image_dataset_from_directory('_view_1', labels='inferred', label_mode='int', class_names=None, color_mode='rgb', batch_size=batch_size, image_size=(img_width, img_height), shuffle=True, seed=123, validation_split=0.2, subset="validation", interpolation='bilinear', follow_links=False)

#view 2
data_train_2 = tf.keras.preprocessing.image_dataset_from_directory('_view_2', labels='inferred', label_mode='int', class_names=None, color_mode='rgb', batch_size=batch_size, image_size=(img_width, img_height), shuffle=True, seed=123, validation_split=0.2, subset="training", interpolation='bilinear', follow_links=False)

data_validation_2 = tf.keras.preprocessing.image_dataset_from_directory('_view_2', labels='inferred', label_mode='int', class_names=None, color_mode='rgb', batch_size=batch_size, image_size=(img_width, img_height), shuffle=True, seed=123, validation_split=0.2, subset="validation", interpolation='bilinear', follow_links=False)

#number of classes
num_classes = len(class_names)

#Define the CNN model which will be shared
shared_cnn = keras.Sequential([
    layers.Rescaling(1./255),
    layers.Conv2D(16, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(32, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),])

#Define the two input views
view1_input = keras.Input(shape=(img_height, img_width, 3))
view2_input = keras.Input(shape=(img_height, img_width, 3))

#Processing of each view through the shared CNN
view1_features = shared_cnn(view1_input)
view2_features = shared_cnn(view2_input)

#Merge the features from both views
Merged_features = layers.concatenate([view1_features, view2_features],axis=1)

#Add dense layers for classification
predictions = layers.Dense(num_classes, activation='sigmoid')(Merged_features)

#Model creation
_model = keras.Model(inputs=[view1_input, view2_input], outputs=predictions)

#Loss function
def contrastive_loss(y_true, y_pred):
    margin = 1.0  # Adjust this margin based on your problem
    return tf.reduce_mean(y_true * tf.square(y_pred) + (1 - y_true) * tf.square(tf.maximum(margin - y_pred, 0)))

#Model compilation
optimizer = keras.optimizers.Adam()
_model.compile(optimizer=optimizer, loss=contrastive_loss)

#epochs
epochs = 10

#Model training
#Assuming data_train_1 and data_train_2 are the two tf.data.Dataset objects
combined_dataset = tf.data.Dataset.zip((data_train_1, data_train_2))

for epoch in range(epochs):
    for (view1_batch, view2_batch), labels in combined_dataset.batch(batch_size):
        with tf.GradientTape() as tape:
            predictions = _model([view1_batch, view2_batch])
            loss = contrastive_loss(labels, predictions)
        gradients = tape.gradient(loss, _model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, _model.trainable_variables))

The error I received is:

ValueError: Input 0 of layer is incompatible with the layer: expected shape=(None, 80, 80, 3), found shape=(32, 32, 80, 80, 3)

It seems that a dimension corresponding to the number of batches is added while it is not intended to be like that. Could you please help to solve this issue?

1

There are 1 best solutions below

0
On

The problem comes from the fact that you are batching twice. In all your dataset creation, put batch_size=None:

data_train_2 = tf.keras.preprocessing.image_dataset_from_directory(
    r'path/to/your/dir',
    labels='inferred', label_mode='int',
    class_names=None, color_mode='rgb',
    batch_size=None,
    image_size=(img_width, img_height), shuffle=True,
    seed=123, validation_split=0.2, subset="training",
    interpolation='bilinear', follow_links=False
)

Then, there was some confusion in the labels in your training loop. Replace them as such:

for epoch in range(epochs):
    for (view1_x, view1_y), (view2_x, view2_y) in combined_dataset.batch(batch_size):
        with tf.GradientTape() as tape:
            predictions = _model([view1_x, view2_x])
            loss = contrastive_loss(
                tf.expand_dims(tf.cast(view2_y, tf.float32), axis=1),
                tf.cast(predictions, tf.float32)
            )
        gradients = tape.gradient(loss, _model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, _model.trainable_variables))

For each batch, you have two ground truth labels — I'm assuming they're the same. So I only used view2_y in the contrastive_loss.

Full simplified example:

import keras
import tensorflow as tf
import keras.layers as layers

img_width, img_height = 25, 25
batch_size = 32

data_train_1 = tf.keras.preprocessing.image_dataset_from_directory(
    r'datasets\mnist\train',
    batch_size=None,
    image_size=(img_width, img_height)
)

data_train_2 = tf.keras.preprocessing.image_dataset_from_directory(
    r'datasets\mnist\train',
    batch_size=None,
    image_size=(img_width, img_height)
)

num_classes = 10

shared_cnn = keras.Sequential([
    layers.Rescaling(1. / 255),
    layers.Conv2D(16, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Flatten(),
    layers.Dense(32, activation='relu'), ])

view1_input = keras.Input(shape=(img_height, img_width, 3))
view2_input = keras.Input(shape=(img_height, img_width, 3))

view1_features = shared_cnn(view1_input)
view2_features = shared_cnn(view2_input)

Merged_features = layers.concatenate([view1_features, view2_features], axis=1)

predictions = layers.Dense(num_classes, activation='sigmoid')(Merged_features)

_model = keras.Model(inputs=[view1_input, view2_input], outputs=predictions)


def contrastive_loss(y_true, y_pred):
    margin = 1.0
    return tf.reduce_mean(y_true * tf.square(y_pred) + (1 - y_true) * tf.square(tf.maximum(margin - y_pred, 0)))


optimizer = keras.optimizers.Adam()
_model.compile(optimizer=optimizer, loss=contrastive_loss)

epochs = 10

combined_dataset = tf.data.Dataset.zip((data_train_1, data_train_2))

for epoch in range(epochs):
    for (view1_x, view1_y), (view2_x, view2_y) in combined_dataset.batch(batch_size):
        with tf.GradientTape() as tape:
            predictions = _model([view1_x, view2_x])
            loss = contrastive_loss(
                tf.expand_dims(tf.cast(view2_y, tf.float32), axis=1),
                tf.cast(predictions, tf.float32)
            )
        gradients = tape.gradient(loss, _model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, _model.trainable_variables))
        print(loss.numpy())