Spark Cassandra SQL can't perform DataFrame methods on query results

1.2k Views Asked by At

So I have a Spark-Cassandra cluster that I am trying to execute sql queries on. I build a jar with sbt assembly then I submit it with spark-submit. This works fine when I am not using spark-sql. When I am using spark sql I get an error, below is the output:

2
CassandraRow{key: key1, value: 1}
3.0
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.trees.LeafNode$class.children(Lorg/apache/spark/sql/catalyst/trees/LeafNode;)Lscala/collection/Seq;
    at org.apache.spark.sql.cassandra.CassandraTableScan.children(CassandraTableScan.scala:19)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$6.apply(TreeNode.scala:280)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:279)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:292)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:247)
    at org.apache.spark.sql.execution.AddExchange.apply(Exchange.scala:128)
    at org.apache.spark.sql.execution.AddExchange.apply(Exchange.scala:124)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1085)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1085)
    at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:889)
    at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:797)
    at CassSparkTest$.main(CassSparkTest.scala:22)
    at CassSparkTest.main(CassSparkTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Here is the scala code for the job, it's very simple:

import org.apache.spark.{SparkContext, SparkConf}
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.sql._

object CassSparkTest {
        def main(args: Array[String]) {
                val conf = new SparkConf(true)
                        .set("spark.cassandra.connection.host", "127.0.0.1")
                val sc = new SparkContext("spark://192.168.10.11:7077", "test", conf)

                val rdd = sc.cassandraTable("test", "kv")
                println(rdd.count)
                println(rdd.first)
                println(rdd.map(_.getInt("value")).sum)

                val sqlC = new CassandraSQLContext(sc)

                val sqlText = "SELECT * FROM test.kv"
                val df = sqlC.sql(sqlText)
                df.show()
                df.foreach(println)
        }
}

As you can see, spark successfully created an rdd with sc.cassandraTable("test", "kv") and it was able to get the count, the first value and the sum.

When I run the sql query I am trying to run through spark-sql on cqlsh this is the result i get:

cqlsh> select * from test.kv;

 key  | value
------+-------
 key1 |     1
 key2 |     2

(2 rows)

Here is the build.sbt file, a fat jar containing spark-cassandra-connector is being held in the lib folder so it automatically gets added to the classpath by sbt as an unmanagedDependancy (I don't think the build file is the problem considering I have successfully created an rdd based on a C* table and used methods on it)

lazy val root = (project in file(".")).
        settings(
                name := "CassSparkTest",
                version := "1.0"
        )
libraryDependencies ++= Seq(
        "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5" % "provided",
        "org.apache.cassandra" % "cassandra-thrift" % "2.1.5" % "provided",
        "org.apache.cassandra" % "cassandra-clientutil" % "2.1.5" % "provided",
        //"com.datastax.spark" %% "spark-cassandra-connector" % "1.3.0-M1"  % "provided",
        "org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
        "org.apache.spark" %% "spark-streaming" % "1.3.0" % "provided",
        "org.apache.spark" %% "spark-sql" % "1.3.0" % "provided"
)
1

There are 1 best solutions below

1
On BEST ANSWER

Try Spark 1.3.1

Check the right version Versions.scala from spark connector