Spark and javax.crypto

273 Views Asked by At

I am trying to learn about encryption and I came up with the following code

import java.util.Base64
object JavaCryptoEncryption{

  val Algorithm = "AES/CBC/PKCS5Padding"
  val IvSpec = new IvParameterSpec(new Array[Byte](16))

  def encrypt(text: String, b64secret: String): String = {
    val cipher = Cipher.getInstance(Algorithm)
    val key = new SecretKeySpec(Base64.getDecoder.decode(b64secret), "AES")
    cipher.init(Cipher.ENCRYPT_MODE, key, IvSpec)

    new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
  }

  def decrypt(text: String, b64secret: String): String = {
    val cipher = Cipher.getInstance(Algorithm)
    val key = new SecretKeySpec(Base64.getDecoder.decode(b64secret), "AES")
    cipher.init(Cipher.DECRYPT_MODE, key, IvSpec)

    new String(cipher.doFinal(Base64.getDecoder.decode(text.getBytes("utf-8"))), "utf-8")
  }
}

Elsewhere in my system I define and store the key. I then apply JavaCryptoEncryption.encrypt and JavaCryptoEncryption.decrypt to a String value and it works fine. However, when I want to convert them to UDFs and apply to a column of a DataFrame I get org.apache.spark.SparkException: Task not serializable . A similar code (without iv) works for AES/ECB/PKCS5Padding. Do certain modes don't support parallelism? Is there a way around? Or maybe there's a different reason?

1

There are 1 best solutions below

0
Rayan Ral On

I think the problem is that you're creating IvParameterSpec instance on the driver, and then Spark is trying to serialize it and sent to the executors (where all UDFs are actually executed). Maybe, try putting all object-creation-related code inside UDF itself, and just provide your key as a UDF parameter? That way you'll need to send only a String to workers, so there shouldn't be a serialization problem.