Spark with Scala: write null-like field value in Cassandra instead of TupleValue

833 Views Asked by At

In one of my collections let's say I have the following field:

f: frozen<tuple<text, set<text>>

Let's say I want to insert an entry where this particular field is empty, null, non-existent, etc. using a Scala script, where before inserting I map the entry's field like this:

sRow("fk") = null // or None, or maybe I simply don't specify the field at all

When trying to run the spark script (from Databricks, Spark connector version 1.6) I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 133.0 failed 1 times, most recent failure: Lost task 6.0 in stage 133.0 (TID 447, localhost): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to com.datastax.spark.connector.TupleValue.
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:47)
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:43)

When using None instead of null I still get an error, though a different one:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 143.0 failed 1 times, most recent failure: Lost task 2.0 in stage 143.0 (TID 474, localhost): java.lang.IllegalArgumentException: requirement failed: Expected 2 components, instead of 0
    at scala.Predef$.require(Predef.scala:233)
    at com.datastax.spark.connector.types.TupleType.newInstance(TupleType.scala:55)

I understand that Cassandra doesn't have the exact notion of null, but I know there's a way of leaving the values out when inserting entries into Cassandra, as I have done this from other environments, like using a nodejs driver for Cassandra. How can I force a null-like value when inserting over an expected TupleValue, or maybe some user defined type?

1

There are 1 best solutions below

3
On

With modern versions of Cassandra it you can use the "Unbound" feature to have it actually skip nulls. This is probably best for your use case because writing a null implicitly writes a tombstone.

See Treating nulls as Unset

//Setup original data (1, 1, 1) --> (6, 6, 6)
sc.parallelize(1 to 6).map(x => (x, x, x)).saveToCassandra(ks, "tab1")

val ignoreNullsWriteConf = WriteConf.fromSparkConf(sc.getConf).copy(ignoreNulls = true)
//These writes will not delete because we are ignoring nulls
val optRdd = sc.parallelize(1 to 6)
  .map(x => (x, None, None))
  .saveToCassandra(ks, "tab1", writeConf = ignoreNullsWriteConf)

val results = sc.cassandraTable[(Int, Int, Int)](ks, "tab1").collect

results
/**
  (1, 1, 1),
  (2, 2, 2),
  (3, 3, 3),
  (4, 4, 4),
  (5, 5, 5),
  (6, 6, 6)
**/

There is also much finer grained controls Full Docs