I have a simple Cassandra table like :
CREATE TABLE my_keyspace.my_table (
my_composite_pk_a bigint,
my_composite_pk_b ascii,
value blob,
PRIMARY KEY ((my_composite_pk_a, my_composite_pk_b))
) WITH bloom_filter_fp_chance = 0.1
AND gc_grace_seconds = 86400
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'enabled': 'true'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};
Note that value refers to a BLOB, with a length of 1MB each.
I have a Spark app as simple as :
spark
// read data from parquet
.read.parquet("...")
// skip large partitions, to avoid overwhelming Cassandra
.withColumn("bytes_count",length(col("value")))
.filter("bytes_count < 1000000") // < 1MB
// project
.select("my_composite_pk_a", "my_composite_pk_b", "value")
// commit to Cassandra
.writeTo("cassandra.my_keyspace.my_table")
.append()
The following properties are used to configure the Spark Cassandra connector :
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes=6
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key=none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec=6
spark.sql.catalog.cassandra.spark.cassandra.connection.host=node1,node2
spark.sql.catalog.cassandra.spark.cassandra.connection.port=9042
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level=LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics=false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS=90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count=100
spark.sql.catalog.cassandra=com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
spark.sql.catalog.cassandra.spark.cassandra.auth.username=USERNAME
spark.sql.catalog.cassandra.spark.cassandra.auth.password=PASSWORD
The relevant Spark properties are :
--total-executor-cores 6
--executor-cores 6
--executor-memory 15G
--driver-memory 6G
--driver-cores 4
So we have a simple executor alone, with 6 cores.
The following error appears on a task when running, so the whole Spark app crash :
...
24/03/06 10:07:24 WARN TaskSetManager: Lost task 886.0 in stage 3.0 (TID 2959, node1, executor 0): java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
...
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply from node4:40201 in 10 seconds
...
24/03/06 10:07:24 ERROR AppendDataExec: Data source write support CassandraBulkWrite(
org.apache.spark.sql.SparkSession@2e8eafb2,
com.datastax.spark.connector.cql.CassandraConnector@1a865ebf,
TableDef(
my_keyspace,
my_table,
ArrayBuffer(
ColumnDef(
my_composite_pk_a,
PartitionKeyColumn,
BigIntType
),
ColumnDef(
my_composite_pk_b,
PartitionKeyColumn,
AsciiType
)
),
ArrayBuffer(),
Stream(
ColumnDef(
my_value,
RegularColumn,
BlobType
)
),
Stream(),
false,
false,
Map()
),
WriteConf(
RowsInBatch(1),
1000,
None,
LOCAL_QUORUM,
false,
false,
6,
Some(6.0),
TTLOption(DefaultValue),
TimestampOption(DefaultValue),false,None),
StructType(StructField(snapshot,LongType,true),
StructField(data_key,StringType,true),
StructField(value,BinaryType,true)
),
org.apache.spark.SparkConf@3903cfc9
)
It seems their is a write error with the Cassandra connector..
Do you have an idea why ?
1MB BLOBs with a batch size of only 1 can put stress on Cassandra. Consider increasing your
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1for more efficient writes.Please note that extremly large batch sizes could lead to memory issues so start with a moderate increase and experiment.