How to test spark application in Scala

780 Views Asked by At

I have a Spark Application, which receives data from files as RDD and sends it to another service (MyService). The processing scheme looks like this:

object Sender {

def handle(myService: MyService) = {
    val rdd = getRdd()
    rdd.foreachPartition(partition => {
        partition.foreach(it =>
            val myData = new MyData(it)
            myService.send(myData))
        })
    }
}

where MyService looks like this:

class MyService() extends Serializable {
    def send(data: MyData) = {
        //do something
    }
}

In my Unit test I try to do something like this:

val myServiceMock = mock[MyService]
val data = new MyData()
Sender.handle(myServiceMock)
verify(myserviceMock).send(eqTo(data))

But when Spark passes data from the driver to the executors, it's serialised and in fact, it's new MyServiceMock objects. And I get Wanted but not invoked and Actually, there were zero interactions with this mock.

Is there a special tool for testing this case?

2

There are 2 best solutions below

0
On

The problem here if you want to use a mocking framework to see if some method you have to take into account several things:

  • Yes, you are evaluating an object instance that is created in the driver so there is no sense in evaluate that instance. You would need to evaluate the instances that are being created in the executors.

  • For doing that check in the executors you would need to put the verification inside the mapPartitions function. I think it is not possible because the mocking framework is not going to be full serializable.

  • Maybe there is a possibility declaring the service mock instance as transient. It will create a mock instance in each executor so you could use the verify method in the mapPartitions function.

0
On

I solved this as described below.

  1. I changed the handle() method, which now takes the partition as a parameter. It looks like this:
object Sender {

def handle(myService: MyService, partition: Iterator[MyData]) = {
    partition.foreach(it =>
        val myData = new MyData(it)
        myService.send(myData))
}
  1. In my test method I do something like this:
import org.mockito.ArgumentMatchersSugar.eqTo
import org.mockito.Mockito.{mock, verify, withSettings}
import mypackage.MyService

class SenderTest extends org.scalatest.FunSuite {
    test("send") {
        val testRdd = getTestRdd()
        testRdd.foreachPartition(partition => {
            val testData = new MyData()
            val myServiceMock = mock[MyService](classOf[MyService], withSettings.serializable())
            Sender.handle(myServiceMock, partition)
            verify(myServiceMock).send(eqTo(testData))
        }
    }
}

The key here is to use the withSettings.serializable() parameter from org.mockito.Mockito when I create mock to make it serializable.