TypeError: Cannot recognize a pipeline stage of type <class 'function'>

700 Views Asked by At

Can I combine sparknlp with pyspark? I have a data (of tweets) consists of two category features "keyword" and "location", and one free textual "text". I am trying to build a a sentence embeddings using GoogleUniversalSentenceEncoder, and add two one hot encoders. This is my code:

from pyspark.sql.functions import udf, col
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.linalg import DenseVector, VectorUDT
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import UniversalSentenceEncoder

# Define the pipeline stages
stages_one_hot = []

# Define the categorical columns
categorical_cols = ["keyword", "location"]

for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col+"_index")
    encoder = OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec")
    stages_one_hot += [indexer, encoder]

stages_text = []
# Add the DocumentAssembler stage
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
stages_text += [documentAssembler]

# Add the UniversalSentenceEncoder stage
encoder = UniversalSentenceEncoder.pretrained().setInputCols(["document"]).setOutputCol("sentence_embedding")
stages_text += [encoder]

# Define the UDF to convert the sentence_embedding column to DenseVector type
# denseVectorizer = udf(lambda x: DenseVector(x.toArray()), VectorUDT())

# Add the DenseVector conversion stage
# denseVector = denseVectorizer(col("sentence_embedding")).alias("sentence_embedding_dense")
denseVectorizer = udf(lambda x: DenseVector(x.toArray()), VectorUDT())
stages_text += [denseVectorizer]

stages_together = []
# Add the VectorAssembler stage
assembler = VectorAssembler(inputCols=[col+"_vec" for col in categorical_cols] + ["sentence_embedding_dense"],
                            outputCol="features")
stages_together += [assembler]

# Add the LogisticRegression stage with the target column as the label
lr = LogisticRegression(featuresCol="features", labelCol="target")
stages_together += [lr]
    
    # Create the pipeline
pipeline = Pipeline(stages=stages_one_hot + stages_text + stages_together)

df = trainDataset

# Fit the pipeline to the data
pipelineModel = pipeline.fit(df)

# Apply the pipeline to the data and get the predictions
transformed_df = pipelineModel.transform(df)

However, I always get this error:

TypeError: Cannot recognize a pipeline stage of type <class 'function'>.

I tried to google everything and talk to chatGPT also, it always brings me to the same point. Is it possible to make this feature extraction and build a model?

Thanks in Advance

1

There are 1 best solutions below

0
Eli Borodach On

Problem solved. The problem was in the type of GoogleUniversalSentenceEncoder(). It isn't a vector as expected. But a list of 1 element dictionary that contains metadata and the vector. Therefore the next UDF should solve the problem:

stages_together = []
convert_to_vector_udf = spark.udf.register(
    "convert_to_vector_udf",
    lambda r : Vectors.dense(r[0][5]), 
    VectorUDT()
)

stages_text += [SQLTransformer(
    statement = "SELECT *, convert_to_vector_udf(sentence_embedding) sentence_embedding_densed FROM __THIS__")]

However working with ClassifierDLApproach will now be forbiden because it is tuned to work with the classic GoogleUniversalSentenceEncoder()