I'm currently trying to migrate some processes from python to (pandas on) spark to measure performance, everything went good until this point:
df_info is of type pyspark.pandas
nlp is defined as:
nlp = spacy.load('es_core_news_sm', disable=["tagger", "parser"])
def lemmatize_pipe(doc):
lemma_list = [str(tok.lemma_).lower() for tok in doc]
return lemma_list
def preprocess_pipe(texts):
preproc_pipe = []
for doc in nlp.pipe(texts, batch_size=20):
preproc_pipe.append(lemmatize_pipe(doc))
return preproc_pipe
df_info['text_2'] = preprocess_pipe(df_info['text'])
I got this error on for doc on nlp.pipe(texts, batch_size=20):
PandasNotImplementedError: The method
pd.Series.__iter__()
is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.
Referenced here:
/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb93782f-db7f-4f94-97bf-409a980a51f7/lib/python3.8/site-packages/spacy/language.py in pipe(self, texts, as_tuples, batch_size, disable, component_cfg, n_process)
1570 else:
1571 # if n_process == 1, no processes are forked.
-> 1572 docs = (self._ensure_doc(text) for text in texts)
1573 for pipe in pipes:
Any idea of how can I solve this?
----EDIT---- Converting to numpy:
df_info['text_2'] = preprocess_pipe(df_info['text'].to_numpy())
changes the error to:
AssertionError Traceback (most recent call last)
<command-1319548240307049> in <module>
9 return preproc_pipe
10
---> 11 df_info['text'] = preprocess_pipe(df_info['text'].to_numpy())
/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args, **kwargs)
192 start = time.perf_counter()
193 try:
--> 194 res = func(*args, **kwargs)
195 logger.log_success(
196 class_name, function_name, time.perf_counter() - start, signature
/databricks/spark/python/pyspark/pandas/frame.py in __setitem__(self, key, value)
11812 ):
11813 psdf = self.reset_index()
> 11814 psdf[key] = ps.DataFrame(value)
11815 psdf = psdf.set_index(psdf.columns[: self._internal.index_level])
11816 psdf.index.names = self.index.names
/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args, **kwargs)
187 if hasattr(_local, "logging") and _local.logging:
188 # no need to log since this should be internal call.
--> 189 return func(*args, **kwargs)
190 _local.logging = True
191 try:
/databricks/spark/python/pyspark/pandas/frame.py in __setitem__(self, key, value)
11799 yield (psdf._psser_for(this_label), this_label)
11800
> 11801 psdf = align_diff_frames(assign_columns, self, value, fillna=False, how="left")
11802 elif isinstance(value, list):
11803 if len(self) != len(value):
/databricks/spark/python/pyspark/pandas/utils.py in align_diff_frames(resolve_func, this, that, fillna, how, preserve_order_column)
421 # it adds new columns for example.
422 if len(this_columns_to_apply) > 0 or len(that_columns_to_apply) > 0:
--> 423 psser_set, column_labels_set = zip(
424 *resolve_func(combined, this_columns_to_apply, that_columns_to_apply)
425 )
/databricks/spark/python/pyspark/pandas/frame.py in assign_columns(psdf, this_column_labels, that_column_labels)
11789 psdf: DataFrame, this_column_labels: List[Label], that_column_labels: List[Label]
11790 ) -> Iterator[Tuple["Series", Label]]:
> 11791 assert len(key) == len(that_column_labels)
11792 # Note that here intentionally uses `zip_longest` that combine
11793 # that_columns.
It seems to fundamentally be a type error that only gets caught when trying to iterate over
texts
innlp.pipe()
(line 1572 ofspacy/language.py
as you cited).texts
must be an iterable (list, tuple, anything implementing__iter__
) of strings.df_info['text']
instead appears to be apd.Series
.You may need to use
pd.Series.values
,pd.Series.array
, or make a call topd.Series.to_numpy()
as the error you shared suggests.If you can share a sample of
df_info
as Ric S asked in the comments, you may get stronger answers as well.