I have requirement to encrypt decrypt using jks file inside spark udf. Running my application using spark-shell I am getting below error
by: java.net.MalformedURLException: no protocol: CDA_KEYSTORE_PT140.jks
I know as udfs are treated as black box by spark so any file inside it would not be read as hdfs file so I tried to send copy of file to local working directory of each executor using
/usr/hdp/current/spark2-client/bin/spark-shell --files CDA_KEYSTORE_PT140.jks
my udf is as below
def impl2(col1:String): String ={
var pilotCrypto=new PilotCryptoImpl
pilotCrypto.setKey1("sensitive data")
pilotCrypto.setKey2("sensitive data")
pilotCrypto.setKey3("sensitive data")
pilotCrypto.init()
EncryptionUtil.setCrypto(pilotCrypto)
val psg = new IvParamSpecGenerator(true)
val crypto = new JceCryptoImpl
crypto.setKeystoreURL("CDA_KEYSTORE_PT140.jks")
crypto.setKeystoreType("JCEKS")
My complete code is as below which I am running using spark-shell command. Code is inside .scala file
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{lit, udf}
import org.apache.spark.sql.types.StringType
def createDataframeFromSql(hc: SparkSession, sql: String): DataFrame = {
var df = hc.sql(sql)
return df
}
def impl2(col1:String): String ={
var pilotCrypto=new PilotCryptoImpl
pilotCrypto.setKey1("EbT5a8Fuq")
pilotCrypto.setKey2("aYt2gv6R")
pilotCrypto.setKey3("9bFp3Gz4k")
pilotCrypto.init()
EncryptionUtil.setCrypto(pilotCrypto)
val psg = new IvParamSpecGenerator(true)
val crypto = new JceCryptoImpl
crypto.setKeystoreURL("CDA_KEYSTORE_PT140.jks")
crypto.setKeystoreType("JCEKS")
crypto.setKeyAlias(EncryptionUtil.decryptHex("sensitive data"))
crypto.setKeyPassword(EncryptionUtil.decryptHex("sensitive data"))
crypto.setCipherTransformation("AES/CBC/PKCS5Padding")
crypto.setAlgorithmParamSpecGenerator(psg)
crypto.setEncodeBase64(true)
crypto.init()
val string_to_decrypt = col1
var encryptedBytes1 = col1.getBytes
var decryptedBytes1 = new String(crypto.decrypt(encryptedBytes1))
decryptedBytes1
}
def processdata(): Unit = {
try {
val hc = SparkSession.builder.appName("HivetoSpark").config("spark.sql.warehouse.dir" , "namenode/apps/hive/warehouse").enableHiveSupport().getOrCreate()
import hc.implicits._
hc.sql("""set hive.exec.dynamic.partition=true""")
hc.sql("""set hive.exec.dynamic.partition.mode=nonstrict""")
hc.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
hc.sql("""set hive.merge.tezfiles=true""")
hc.sql("""set hive.merge.smallfiles.avgsize=256000000""")
hc.sql("""set hive.merge.size.per.task=256000000""")
hc.sql("""set hive.merge.sparkfiles=true""")
val start = System.currentTimeMillis()
val impl2udf = udf(impl2 _)
var query1 =s"""select * from table"""
var fraud_trn_df1 = createDataframeFromSql(hc, query1)
fraud_trn_df1.show(5,false)
val fraud_trn_df2 = fraud_trn_df1.withColumn("FT_PRIM_NUM_AMT_decr",impl2udf(col("ft_prim_num_amt")))
val fraud_trn_df3 = fraud_trn_df2.withColumn("FT_SECONDARY_NUMBER_AMOUNT_decr",impl2udf(col("ft_secndy_num_amt")))
fraud_trn_df3.show(5,false)
val end = System.currentTimeMillis()
val runTimeInSec = (end - start) / 1000.0
println(s"runTimeInSec: ${runTimeInSec}sec")
}
I am calling processdata in the end.
stackrace is as below
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_133$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.MalformedURLException: no protocol: CDA_KEYSTORE_PT140.jks
at java.net.URL.<init>(URL.java:593)
at java.net.URL.<init>(URL.java:490)
at java.net.URL.<init>(URL.java:439)
at com.telus.framework.crypto.impl.jce.JceCryptoImpl.init(JceCryptoImpl.java:78)
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.impl2(<console>:56)
at $line29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:59)
at $line29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:59)
That URL is missing the protocol (as the error message stated).
It should be written in the following format:
If you have to run with a local file, then use:
With Spark I prefer HDFS files, since they are easier to reach by all workers. Otherwise, with local files, you will have to copy it to all worker nodes.