AssertionError while training distributed LdaModel in Gensim

91 Views Asked by At

I've been trying to fit a topic model using LdaModel() on a large dataset (~17 mil entries). I'm running my code on my University's HPC cluster, and have been running into issues when I try to train the model in distributed mode (distributed = True). Right now I'm just trying to run on 4 cores on a small subset of the data, but will probably need to increase the number of cores when I run on the entire dataset. Here is the error:

https://i.stack.imgur.com/xiaTV.png

When I run in serial mode (distributed = False), it seems to work fine. When running distributed, it seems like gensim is trying to train using all 4 cores. The callback log produced the following statement: "2021-04-26 18:32:05,757:INFO:using distributed version with 4 workers". I'm using the most recent versions of Gensim v4.0 and Pyro4 v4.8 and Python v3.7.7. I also tried downgrading gensim to 3.8.3, but that didn't seem to help.

Also worth noting that I'm running the python script from a shell script, in which I've included the following lines of code as suggested by the gensim documentation:

export PYRO_SERIALIZERS_ACCEPTED=pickle
export PYRO_SERIALIZER=pickle  
python -m Pyro4.naming -n 0.0.0.0 &

for i in {1..4}
do
    python -m gensim.models.lda_worker &
    echo "$i"
done
python -m gensim.models.lda_dispatcher &

Here's most of the python script that I'm trying to run:

# Import preprocessed text
df = pd.read_feather('narratives_processed_1.feather')
# Sample down to speed up debugging
df = df.loc[1:4000]

# Create dictionary from text
dictionary = Dictionary(df.FOI_TEXT)
dictionary.filter_extremes(no_below=100, no_above=0.5)
dictionary.compactify()
# Create corpus
corpus = [dictionary.doc2bow(doc) for doc in df.FOI_TEXT]
print('Number of unique tokens: %d' % len(dictionary))
print('Number of documents: %d' % len(corpus))
temp = dictionary[0]
id2word = dictionary.id2token

#Callback logging for coherence using the u_mass metric
coherence_umass_logger = CoherenceMetric(corpus=corpus, logger='shell', coherence = 'u_mass')

filename = "model_callbacks_1.log"
import logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
logging.basicConfig(filename = filename,
                    format="%(asctime)s:%(levelname)s:%(message)s",
                    level=logging.INFO)

#Iterate over model parameters
iterations = [10,50,100]
num_topics = [2,3]
passes = 2

all_metrics = pd.DataFrame()

print('Fitting models...')
for iteration in iterations:
    print('Iterations for this model: %d'%(iteration))
    for num_topic in num_topics:
        print('Topics for this model: %d'%(num_topic))
        
        # Create model
        model = LdaModel(corpus=corpus,
                 num_topics=num_topic,
                 id2word=id2word,
                 eval_every=0,
                 passes=passes,
                 iterations=iteration,
                 chunksize=1000,
                 random_state=100,
                 callbacks=[coherence_umass_logger],
                 distributed = True)
            
        df_temp = pd.DataFrame.from_dict(model.metrics)
        df_temp['iterations'] = iteration
        df_temp['topics'] = num_topic
    
        all_metrics = pd.concat([all_metrics, df_temp])

Any help on this error is much appreciated!

0

There are 0 best solutions below