(first StackOverflow post - I've searched but may have overseen an answer - if so, I appologize...)
I have a local Spark standalone instance running, and have tested it quite thoroughly using Sparklyr (writing to and from memory, creating tables, pipelines, etc.). I have a Kafka installation working from which I can stream data, and the idea is to process this and save some results in a Cassandra (cluster).
I have installed a standalone Cassandra (listening on localhost) and created the following schema and table via cqlsh 127.0.0.1 9042 (so the connection to Cassandra works) with authentication switched off so that it cannot be a source of error:
CREATE KEYSPACE IF NOT EXISTS SparkTestKeyspace
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE SparkTestKeyspace;
CREATE TABLE emp(
emp_id int PRIMARY KEY,
emp_name text,
emp_city text,
emp_sal varint,
emp_phone varint
);
INSERT INTO emp (emp_id, emp_name, emp_city,
emp_phone, emp_sal) VALUES(1,'ram', 'Hyderabad', 9848022338, 50000);
INSERT INTO emp (emp_id, emp_name, emp_city,
emp_phone, emp_sal) VALUES(2,'robin', 'Hyderabad', 9848022339, 40000);
INSERT INTO emp (emp_id, emp_name, emp_city,
emp_phone, emp_sal) VALUES(3,'rahman', 'Chennai', 9848022330, 45000);
SELECT * FROM emp;
*emp_id | emp_city | emp_name | emp_phone | emp_sal
--------+-----------+----------+------------+---------
1 | Hyderabad | ram | 9848022338 | 50000
2 | Hyderabad | robin | 9848022339 | 40000
3 | Chennai | rahman | 9848022330 | 45000
I've configured the Sparklyr environment as follows and get a connection to spark with no problem (I've added options(sparklyr.log.console = TRUE) to the environment to see as many errors as possible, see below.)
#Paranoid that the environment is correct so initialize it explicitly in R
Sys.setenv(JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64")
Sys.setenv(SPARK_HOME = "/home/x/spark/spark-3.2.1-bin-hadoop3.2")
#Added jar paths for cassandra and kafka (didn't help)
Sys.setenv(CLASSPATH = "/usr/share/cassandra/lib/:/opt/kafka/libs/*")
library(sparklyr)
LConfig <- spark_config()
LConfig$spark.executor.memory <- "16G"
LConfig$spark.eventLog.dir <- "file:///home/x/spark/spark-3.2.1-bin-hadoop3.2/log"
LConfig$spark.history.fs.logDirectory <- "file:///home/x/spark/spark-3.2.1-bin-hadoop3.2/log"
LConfig$'spark.cassandra.connection.host' <- "127.0.0.1"
LConfig$'spark.cassandra.connection.port' <- "9042"
LConfig$'spark.cassandra.output.batch.grouping.key' <- "none"
LConfig$'sparklyr.connect.packages' <- "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0"
sc <- spark_connect(master = "local", version = "3.2.1",
config = LConfig
)
I generate a reference to the table (mapped, not in memory - see below) as follows:
EmpPointer <- spark_read_source(
sc,
name = "emp",
source = "org.apache.spark.sql.cassandra",
options = list(keyspace = "sparktestkeyspace", table = "emp"),
memory = FALSE)
After which I can "see" the table and query its columns (metadata):
library(dplyr)
tbl_vars(EmpPointer)
<dplyr:::vars>
[1] "emp_id" "emp_city" "emp_name" "emp_phone" "emp_sal"
However, when I try to draw data from the table, etc. I get the following Java error:
S
parkTables <- src_tbls(sc)
tbl_name <- SparkTables[1]
tbl_name
[1] "emp_939777ab_c0f4_4494_bc14_31b9b3356459"
library(DBI)
dbGetQuery(sc, paste("SELECT 'emp_city' FROM ", tbl_name))
Error in value[[3L]](cond) :
Failed to fetch data: java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/partitioning/KeyGroupedPartitioning
at com.datastax.spark.connector.datasource.CassandraScan.outputPartitioning(CassandraScanBuilder.scala:311)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.outputPartitioning(DataSourceV2ScanExecBase.scala:87)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.outputPartitioning$(DataSourceV2ScanExecBase.scala:81)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.outputPartitioning(BatchScanExec.scala:35)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:53)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:
Same thing if I want to describe the table with
sdf_describe(EmpPointer)
or
glimpse(EmpPointer)
or if I try to load it into memory with
EmpPointer <- spark_read_source(
sc,
name = "emp",
source = "org.apache.spark.sql.cassandra",
options = list(keyspace = "sparktestkeyspace", table = "emp"),
memory = TRUE)
As said, I can import and export text/JSON/XML/CSV, but connecting to Cassandra eludes me. Its probably obvious, but does someone know where the error comes from and how can I fix it?
23/06/14 20:45:58 ERROR sparklyr: Gateway (6527) failed calling describe on 31: java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/partitioning/KeyGroupedPartitioning
(There are about 5 feet of Java errors following this one that I can post if useful.)
I have also removed all of the cached spark packets (spark/work and below .ivy) so that fresh ones are directly downloaded from the repos. No change.
I then used this URL as a guide https://dzone.com/articles/spark-cassandra-connector-on-spark-shell and connected directly with spark-shell:
~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.3.0 --conf spark.cassandra.connection.host=127.0.0.1
sc.stop
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
sc.cassandraTable("sparktestkeyspace", "emp").select("emp_id").as((id:Int) => (id,1)).reduceByKey(_ + _).collect.foreach(println)
(1,1)
(2,1)
(3,1)
So it would seem that the problem is with Sparklyr or how I'm doing things, since the connector is the same version and the connection works on localhost as the cqlsh program does.
Best regards Les James
Here is my R environment:
> print(sessionInfo())
R version 4.3.0 (2023-04-21)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Linux Mint 21.1
Matrix products: default
BLAS: /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/libopenblasp-r0.3.20.so; LAPACK version 3.10.0
locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=de_DE.UTF-8 LC_MESSAGES=en_US.UTF-8 LC_PAPER=de_DE.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C LC_MEASUREMENT=de_DE.UTF-8 LC_IDENTIFICATION=C
time zone: Europe/Berlin
tzcode source: system (glibc)
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] DBI_1.1.3 dplyr_1.1.2 sparklyr_1.8.1
That's a version problem with your connector:
You'll have to verify the version of spark and match that with the connector:
https://github.com/datastax/spark-cassandra-connector/tree/master#version-compatibility