I am inserting a df with around 14k rows in cassandra database of Data Stax. I am using the free version of Data Stax where you get 25 MB of storage limit. My dataset is around 1.5 MB of size. My code shows no error after insertion and fetching. But after I count the rows after fetching, I get only around 1.5k rows. I can't seem to figure out where the problem lies. Is it in insertion code or fetching code? I am not able to figure out after racking my brains and searching Google multiple times. Following is my code-:
cassandraDBLoad.py
def progressbar(it, prefix="", size=60, out=sys.stdout): # Python3.3+
count = len(it)
def show(j):
x = int(size*j/count)
print("{}[{}{}] {}/{}".format(prefix, u"█"*x, "."*(size-x), j, count),
end='\r', file=out, flush=True)
show(0)
for i, item in enumerate(it):
yield item
show(i+1)
print("\n", flush=True, file=out)
def cassandraDBLoad(config_path):
try:
config = read_params(config_path)
execution_profile = ExecutionProfile(request_timeout=10)
cassandra_config = {'secure_connect_bundle': config["connect_cassandra"]["cloud_config"]}
auth_provider = PlainTextAuthProvider(
config["connect_cassandra"]["client_id"],
config["connect_cassandra"]["client_secret"]
)
cluster = Cluster(cloud=cassandra_config, auth_provider=auth_provider)
session = cluster.connect()
session.default_timeout = None
connect_db = session.execute("select release_version from system.local")
set_keyspace = session.set_keyspace(config["cassandra_db"]["keyspace"])
table_ = config["cassandra_db"]["data_table"]
define_columns = config["cassandra_db"]["define_columns"]
create_table = f"CREATE TABLE IF NOT EXISTS {table_}({define_columns});"
start_create = time.process_time()
table_result = session.execute(create_table)
train = pd.read_csv(config["data_source"]["train_source"])
test = pd.read_csv(config["data_source"]["test_source"])
#Combine test and train into one file
train['source']='train'
test['source']='test'
df = pd.concat([train, test],ignore_index=True)
df = df.fillna('NA')
columns = list(df)
for col in columns:
df[col] = df[col].map(str)
columns = config["cassandra_db"]["columns"]
insert_qry = f"INSERT INTO {table_}({columns}) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) IF NOT EXISTS"
statement = session.prepare(insert_qry)
start_insert = time.process_time()
batch = BatchStatement()
for i in progressbar(range(len(df)), "Inserting: ", 40):
time.sleep(0.1)
session.execute_async(
statement,
[
df.iat[i,0], df.iat[i,1], df.iat[i,2], df.iat[i,3], df.iat[i,4], df.iat[i,5],
df.iat[i,6], df.iat[i,7], df.iat[i,8], df.iat[i,9], df.iat[i,10], df.iat[i,11],
df.iat[i,12]
]
)
print("Time taken to insert df - " + str((time.process_time() - start_insert)/60) + " minutes")
except Exception as e:
raise Exception("(cassandraDBLoad): Something went wrong in the CassandraDB Load operations\n" + str(e))
The above code is taking around 30 min to insert rows. I am having 12 GB RAM with 2 CPU cores.
preprocess_data.py
def pandas_factory(colnames, rows):
return pd.DataFrame(rows, columns=colnames)
def preprocess_data(config_path):
try:
config = read_params(config_path)
cassandra_config = {'secure_connect_bundle': config["connect_cassandra"]["cloud_config"]}
auth_provider = PlainTextAuthProvider(
config["connect_cassandra"]["client_id"],
config["connect_cassandra"]["client_secret"]
)
cluster = Cluster(cloud=cassandra_config, auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace(config["cassandra_db"]["keyspace"])
session.row_factory = pandas_factory
#session.default_fetch_size = None
count_query = f"SELECT COUNT(*) from {config['cassandra_db']['data_table']} LIMIT 20000"
count_rslt = session.execute(count_query, timeout=None)
print(count_rslt._current_rows)
query = f"SELECT * from {config['cassandra_db']['data_table']}"
simple_statement = SimpleStatement(query, consistency_level=ConsistencyLevel.ONE, fetch_size=None)
execute_result = session.execute(simple_statement, timeout=None)
data = execute_result._current_rows
print(data.shape)
except Exception as e:
raise Exception("(preprocessData): " + str(e))
CSV files link - https://drive.google.com/drive/folders/1O03lNTMfSwhUKG61zOs7fNxXIRe44GRp?usp=sharing Kindly help to either insert full dataframe or fetch all the rows depending upon where the issue lies.
My best guess is that there is a mismatch between what you think you are counting, and the actual rows in the table.
I say it's a "guess" because it is impossible to know what you are doing without knowing the table's CQL schema. Let me illustrate with some examples.
For a table with a simple primary key, each partition can have exactly one row. For example:
If there were 500 users in the table, this query will also return 500 rows:
For a table with a compound primary key (composed of partition key + at least one clustering key), each partition can have ONE OR MORE rows. For example:
A user (partition) can have one or more (rows of) emails -- personal, work, etc. If there were 500 users in the
user_emailstable, this query will simply return the number of partitions (users) despite each partition having any number of rows:As a side note, counting rows in Cassandra isn't the same thing as counting records in traditional relational databases. I've explained it in detail in this DBA Stack Exchange post -- Why is COUNT() bad in Cassandra?
As I mentioned in your other post, if you have to count records then use DataStax Bulk Loader (DSBulk) which is developed for this purpose. DSBulk has a nice feature for counting data in large tables in a distributed manner. It is open-source so Apache Cassandra users can use it for free. Cheers!