Cassandra ExecutionResult on importlib._bootstrap failed

117 Views Asked by At

I am trying to execute multi-process to pull the data from Cassandra. But, I'm facing the issue.I want to pull it for single key or multiple keys using the multi-process provided my Cassandra My cassandra_db class

from cassandra.cluster import Cluster
import cassandra
import pandas as pd
import numpy as np
from datetime import datetime
import sys
import os
from threading import Event
import itertools
from multiprocessing import Pool
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import tuple_factory
ip_address = '127.0.0.1'

class cassandra_db(object):

    concurrency = 2 # chosen to match the default in execute_concurrent_with_args

    def __init__(self,process_count=None):
        self.pool = Pool(processes=process_count, initializer=self._setup)

    @classmethod
    def _setup(cls):
        cls.session = Cluster([ip_address]).connect(keyspace='test')
        cls.session.row_factory = pandas_factory
        cls.prepared = cls.session.prepare('SELECT * FROM tr_test WHERE key=?')

    def close_pool(self):
        self.pool.close()
        self.pool.join()

    def get_results(self, params):
        try:
             xrange
        except NameError:
              xrange = range
        params = list(params)
        print("-----> ",params)
        print("-----+>",self.concurrency)
        self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))


    @classmethod
    def _results_from_concurrent(cls, params):
        return execute_concurrent_with_args(cls.session, cls.prepared, params)

def _multiprocess_get(params):
    return cassandra_db._results_from_concurrent(params)

My calling class

    import os
import pandas as pd
import sys
relative_path='/home/anji'
sys.path.append(os.path.join(relative_path ,'commons','Database Operations'))
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra_db import cassandra_db
from cassandra.policies import ConstantReconnectionPolicy
processes =2
con_db = cassandra_db(processes)
keys=[(1,),(2,)]
df = con_db.get_results(keys)
print("Result",df.head())

Error:

multiprocessing.pool.MaybeEncodingError: Error sending result: '[[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x7fa93658bbe0>), ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x7fa936a2e0f0>)]]'. Reason: 'PicklingError("Can't pickle <class 'importlib._bootstrap.ExecutionResult'>: attribute lookup ExecutionResult on importlib._bootstrap failed",)'

My trying to execute for 2 keys but facing the issue. Can any help me to solve this issue

0

There are 0 best solutions below