I am running a spark-submit job in cloudera (CDH 5.13) VM. My job tries to connet hbase table using spark scala. Project is built using Eclipse. In Eclipse job is running fine, but when I try to run spark submit through shell then I get java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/Table.
Error:
[cloudera@quickstart bin]$ ./spark-submit --class sparkHiveHbaseInt2 /home/cloudera/Desktop/Demo_Final.jar
24/01/30 02:11:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/Table
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.Table
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 13 more
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[cloudera@quickstart bin]$
This is my code in eclipse to connect to hbase
import org.apache.spark.sql.{ Row, SaveMode, SparkSession }
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{
HBaseAdmin,
Result,
Put,
HTable,
ConnectionFactory,
Connection,
Get,
Scan
}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Bytes
object sparkHiveHbaseInt2 extends App {
//set logging level to error
Logger.getLogger("org").setLevel(Level.ERROR)
// create spark config object
val sparkConf = new SparkConf()
sparkConf.setAppName("Credit_Card_Fraud_Detection")
sparkConf.setMaster("local[2]")
sparkConf.set("hive.metastore.uris", "thrift://localhost:9083")
// use spark config object to create spark session
val spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quoram", "localhost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val connection: Connection = ConnectionFactory.createConnection(conf)
val tableName = connection.getTable(TableName.valueOf("card_lookup"))
//SET hive.metastore.schema.verification=true;
import spark.implicits._
import spark.sql
// start writing the hive queries
val df_ucl = sql("""
with cte_rownum as
(
select card_id,amount,member_id,transaction_dt,
first_value(postcode) over(partition by card_id order by transaction_dt desc) as postcode,
row_number() over(partition by card_id order by transaction_dt desc) rownum
from bigdataproject.card_transactions
)
select card_id,member_id,
round((avg(amount)+ 3* max(std)),0) as ucl ,
max(score) score,
max(transaction_dt) as last_txn_time,
max(Postcode)as last_txn_zip
from
( select
card_id,amount,
c.member_id,
m.score,
c.transaction_dt,
Postcode,
STDDEV (amount) over(partition by card_id order by (select 1) desc) std
from cte_rownum c
inner join bigdataproject.member_score_bucketed m on c.member_id=m.member_id
where rownum<=10
)a
group by card_id,member_id
""")
val df = df_ucl.select("card_id", "member_id", "ucl", "score", "last_txn_time", "last_txn_zip")
//df.write.mode("append").saveAsTable("card_lookup")
//println(sql("""select * from card_lookup""").count())
println("Dataframe extract is completed");
df.foreach { myRow =>
{
var myArray = myRow.mkString(",").split(",")
var cardId = myArray(0)
var memberId = myArray(1)
var ucl = myArray(2)
var score = myArray(3)
var lastTxnTime = myArray(4)
var lastTxnZip = myArray(5)
val row = new Put(Bytes.toBytes(cardId))
row.addColumn(Bytes.toBytes("lkp_data"),Bytes.toBytes("member_id"),Bytes.toBytes(memberId))
row.addColumn(Bytes.toBytes("lkp_data"),Bytes.toBytes("ucl"),Bytes.toBytes(ucl))
row.addColumn(Bytes.toBytes("lkp_data"),Bytes.toBytes("score"),Bytes.toBytes(score))
row.addColumn(Bytes.toBytes("lkp_data"),Bytes.toBytes("last_txn_time"),Bytes.toBytes(lastTxnTime))
row.addColumn(Bytes.toBytes("lkp_data"),Bytes.toBytes("last_txn_zip"),Bytes.toBytes(lastTxnZip))
tableName.put(row)
}
}
tableName.close()
connection.close()
println("Success");
}
Before submitting spark job I am exporting the project from eclipse in the form of JAR.