I am trying to execute multi-process to pull the data from Cassandra. But, I'm facing the issue.I want to pull it for single key or multiple keys using the multi-process provided my Cassandra My cassandra_db class
from cassandra.cluster import Cluster
import cassandra
import pandas as pd
import numpy as np
from datetime import datetime
import sys
import os
from threading import Event
import itertools
from multiprocessing import Pool
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import tuple_factory
ip_address = '127.0.0.1'
class cassandra_db(object):
concurrency = 2 # chosen to match the default in execute_concurrent_with_args
def __init__(self,process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup)
@classmethod
def _setup(cls):
cls.session = Cluster([ip_address]).connect(keyspace='test')
cls.session.row_factory = pandas_factory
cls.prepared = cls.session.prepare('SELECT * FROM tr_test WHERE key=?')
def close_pool(self):
self.pool.close()
self.pool.join()
def get_results(self, params):
try:
xrange
except NameError:
xrange = range
params = list(params)
print("-----> ",params)
print("-----+>",self.concurrency)
self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))
@classmethod
def _results_from_concurrent(cls, params):
return execute_concurrent_with_args(cls.session, cls.prepared, params)
def _multiprocess_get(params):
return cassandra_db._results_from_concurrent(params)
My calling class
import os
import pandas as pd
import sys
relative_path='/home/anji'
sys.path.append(os.path.join(relative_path ,'commons','Database Operations'))
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra_db import cassandra_db
from cassandra.policies import ConstantReconnectionPolicy
processes =2
con_db = cassandra_db(processes)
keys=[(1,),(2,)]
df = con_db.get_results(keys)
print("Result",df.head())
Error:
multiprocessing.pool.MaybeEncodingError: Error sending result: '[[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x7fa93658bbe0>), ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x7fa936a2e0f0>)]]'. Reason: 'PicklingError("Can't pickle <class 'importlib._bootstrap.ExecutionResult'>: attribute lookup ExecutionResult on importlib._bootstrap failed",)'
My trying to execute for 2 keys but facing the issue. Can any help me to solve this issue