Pandas on Spark 3.2 -NLP.pipe - pd.Series.__iter__() is not implemented

1.1k Views Asked by At

I'm currently trying to migrate some processes from python to (pandas on) spark to measure performance, everything went good until this point:

df_info is of type pyspark.pandas

nlp is defined as: nlp = spacy.load('es_core_news_sm', disable=["tagger", "parser"])

def lemmatize_pipe(doc):
    lemma_list = [str(tok.lemma_).lower() for tok in doc] 
    return lemma_list

def preprocess_pipe(texts):
    preproc_pipe = []
    for doc in nlp.pipe(texts, batch_size=20):
        preproc_pipe.append(lemmatize_pipe(doc))
    return preproc_pipe

df_info['text_2'] = preprocess_pipe(df_info['text'])

I got this error on for doc on nlp.pipe(texts, batch_size=20):

PandasNotImplementedError: The method pd.Series.__iter__() is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

Referenced here:

/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb93782f-db7f-4f94-97bf-409a980a51f7/lib/python3.8/site-packages/spacy/language.py in pipe(self, texts, as_tuples, batch_size, disable, component_cfg, n_process)
   1570         else:
   1571             # if n_process == 1, no processes are forked.
-> 1572             docs = (self._ensure_doc(text) for text in texts)
   1573             for pipe in pipes:

Any idea of how can I solve this?

----EDIT---- Converting to numpy:

df_info['text_2'] = preprocess_pipe(df_info['text'].to_numpy())

changes the error to:

AssertionError                            Traceback (most recent call last)
<command-1319548240307049> in <module>
      9     return preproc_pipe
     10 
---> 11 df_info['text'] = preprocess_pipe(df_info['text'].to_numpy())


/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    192             start = time.perf_counter()
    193             try:
--> 194                 res = func(*args, **kwargs)
    195                 logger.log_success(
    196                     class_name, function_name, time.perf_counter() - start, signature

/databricks/spark/python/pyspark/pandas/frame.py in __setitem__(self, key, value)
  11812             ):
  11813                 psdf = self.reset_index()
> 11814                 psdf[key] = ps.DataFrame(value)
  11815                 psdf = psdf.set_index(psdf.columns[: self._internal.index_level])
  11816                 psdf.index.names = self.index.names

/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    187         if hasattr(_local, "logging") and _local.logging:
    188             # no need to log since this should be internal call.
--> 189             return func(*args, **kwargs)
    190         _local.logging = True
    191         try:

/databricks/spark/python/pyspark/pandas/frame.py in __setitem__(self, key, value)
  11799                         yield (psdf._psser_for(this_label), this_label)
  11800 
> 11801             psdf = align_diff_frames(assign_columns, self, value, fillna=False, how="left")
  11802         elif isinstance(value, list):
  11803             if len(self) != len(value):

/databricks/spark/python/pyspark/pandas/utils.py in align_diff_frames(resolve_func, this, that, fillna, how, preserve_order_column)
    421     # it adds new columns for example.
    422     if len(this_columns_to_apply) > 0 or len(that_columns_to_apply) > 0:
--> 423         psser_set, column_labels_set = zip(
    424             *resolve_func(combined, this_columns_to_apply, that_columns_to_apply)
    425         )

/databricks/spark/python/pyspark/pandas/frame.py in assign_columns(psdf, this_column_labels, that_column_labels)
  11789                 psdf: DataFrame, this_column_labels: List[Label], that_column_labels: List[Label]
  11790             ) -> Iterator[Tuple["Series", Label]]:
> 11791                 assert len(key) == len(that_column_labels)
  11792                 # Note that here intentionally uses `zip_longest` that combine
  11793                 # that_columns.
1

There are 1 best solutions below

4
On

It seems to fundamentally be a type error that only gets caught when trying to iterate over texts in nlp.pipe() (line 1572 of spacy/language.py as you cited).

texts must be an iterable (list, tuple, anything implementing __iter__) of strings. df_info['text'] instead appears to be a pd.Series.

You may need to use pd.Series.values, pd.Series.array, or make a call to pd.Series.to_numpy() as the error you shared suggests.

df_info['text_2'] = preprocess_pipe(df_info['text'])  # original
# df_info['text_2'] = preprocess_pipe(df_info['text'].values)
# df_info['text_2'] = preprocess_pipe(df_info['text'].array)
# df_info['text_2'] = preprocess_pipe(df_info['text'].to_numpy())

If you can share a sample of df_info as Ric S asked in the comments, you may get stronger answers as well.