Pass information between pipeline steps in sklearn

484 Views Asked by At

I am working on a simple text generation problem with LSTMs. To make the preprocessing more compact and reproducible, I decided to implement everything in sklearn fashion, using custom sklearn transformers, and the KerasClassifier from scikeras to wrap the neural network definition in a sklearn-type estimator.

It almost works but I can't figure out how to pass information from within a certain custom transformer on to the KerasClassifier estimator. More precisely, for the method that creates the neural network, I need the number of outputs as an argument; but this depends on the number of words in the fitted vocabulary - which is an information that is currently encapsulated in ModelEncoder class.

(Note that in order to get the current logic work, I had to slightly modify the default sklearn Pipeline class, as it wouldn't allow modifying and returning both X and y. In other words, the default sklearn Pipeline only allows feature transformations but not target transformations. Modifying the custom Pipeline class was explained in this StackOverflow post.)

Example data:

train_data = ['o by no means honest ventidius i gave it freely ever and theres none can truly say he gives if our betters play at that game we must not dare to imitate them faults that are rich are fair'
 'but was not this nigh shore'
 'impairing henry strengthening misproud york the common people swarm like summer flies and whither fly the gnats but to the sun'
 'what while you were there'
 'chill pick your teeth zir come no matter vor your foins'
 'thanks dear isabel' 'come prick me bullcalf till he roar again'
 'go some of you knock at the abbeygate and bid the lady abbess come to me'
 'an twere not as good deed as drink to break the pate on thee i am a very villain'
 'beaufort it is thy sovereign speaks to thee'
 'but say lucetta now we are alone wouldst thou then counsel me to fall in love'
 'for being a bawd for being a bawd'
 'all blest secrets all you unpublishd virtues of the earth spring with my tears'
 'what likelihood' 'o find him']

Full code:

# Modify the sklearn Pipeline class to allow it to return tuples and hence enable both X and y modifications. (Current default implementation in sklearn only allows
# feature transformations, i.e. transformations on X, but not on y.)
class Pipeline(pipeline.Pipeline):

    def _fit(self, X, y=None, **fit_params_steps):
        self.steps = list(self.steps)
        self._validate_steps()
        memory = check_memory(self.memory)

        fit_transform_one_cached = memory.cache(pipeline._fit_transform_one)

        for (step_idx, name, transformer) in self._iter(
            with_final=False, filter_passthrough=False
        ):
                        
            if transformer is None or transformer == "passthrough":
                with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
                    continue

            try:
                # joblib >= 0.12
                mem = memory.location
            except AttributeError:
                mem = memory.cachedir
            finally:
                cloned_transformer = clone(transformer) if mem else transformer

            X, fitted_transformer = fit_transform_one_cached(
                cloned_transformer,
                X,
                y,
                None,
                message_clsname="Pipeline",
                message=self._log_message(step_idx),
                **fit_params_steps[name],
            )
            
            if isinstance(X, tuple):    ###### unpack X if is tuple X = (X,y)
                X, y = X
            
            self.steps[step_idx] = (name, fitted_transformer)
        
        return X, y
    
    def fit(self, X, y=None, **fit_params):
        fit_params_steps = self._check_fit_params(**fit_params)
        Xt = self._fit(X, y, **fit_params_steps)
        
        if isinstance(Xt, tuple):    ###### unpack X if is tuple X = (X,y)
            Xt, y = Xt 
        
        with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
            if self._final_estimator != "passthrough":
                fit_params_last_step = fit_params_steps[self.steps[-1][0]]
                self._final_estimator.fit(Xt, y, **fit_params_last_step)

        return self

class ModelTokenizer(TransformerMixin, BaseEstimator):
    def __init__(self, max_len=100):
        super().__init__()
        self.max_len = max_len 
    def fit(self, X=None, y=None):
        return self  
    def transform(self, X, y=None):
        X_flattened = " ".join(X).split()
        sequences = list() 
        for i in range(self.max_len+1, len(X_flattened)):
            seq = X_flattened[i-self.max_len-1:i]
            sequences.append(seq)
        return sequences 

class ModelEncoder(TransformerMixin, BaseEstimator):
    def __init__(self):
        super().__init__()
        self.tokenizer = Tokenizer()
    def fit(self, X=None, y=None):
        self.tokenizer.fit_on_texts(X)
        return self 
    def transform(self, X, y=None):
        encoded_sequences = np.array(self.tokenizer.texts_to_sequences(X))
        return (encoded_sequences[:,:-1], encoded_sequences[:,-1])

def create_nn(input_shape=(100,1), output_shape=None):
    
    model = Sequential()
    model.add(LSTM(64, input_shape=input_shape, return_sequences=True))
    model.add(Dropout(0.3))
    model.add(Flatten())
    model.add(Dense(20, activation='relu'))
    model.add(Dropout(0.3))
    model.add(Dense(output_shape, activation='softmax'))
    
    metrics_list = [tf.keras.metrics.BinaryAccuracy(name='accuracy')]

    model.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = metrics_list)
    return model

pipe = Pipeline([
    ('tokenizer', ModelTokenizer()),
    ('encoder', ModelEncoder()),
    ('model', KerasClassifier(build_fn=create_nn, epochs=10, output_shape=vocab_size)),
])

# Question: how to pass 'vocab_size'?

Imports:

from sklearn import pipeline
from sklearn.base import clone
from sklearn.utils import _print_elapsed_time
from sklearn.utils.validation import check_memory
from sklearn.base import BaseEstimator, TransformerMixin
from keras.preprocessing.text import Tokenizer
from scikeras.wrappers import KerasClassifier
1

There are 1 best solutions below

0
On

KerasClassifier has its own internal transformer (see here, it is used to provide one-hot encoding and such) which has an API to pass metadata to the model (see here, that's how arguments such as n_outputs_ are passed into the model building function). Could you override that to pass this extra metadata to the model? It's stepping a bit outside of the Scikit-Learn API, but as you've noted the Scikit-Learn API doesn't have this functionality built in. If you want to propagate that information from a Transformer in your pipeline into SciKeras you could encode it into a feature and then use the above-mentioned hooks along with a custom encoder to remove that feature and convert it into metadata that can be passed into the model, but now you'd be really pushing the Scikit-Learn API.