【问题标题】:How to test spark application in Scala如何在 Scala 中测试 Spark 应用程序
【发布时间】:2021-03-29 10:40:49
【问题描述】:

我有一个 Spark 应用程序,它从文件作为 RDD 接收数据并将其发送到另一个服务 (MyService)。处理方案如下:

object Sender {

def handle(myService: MyService) = {
    val rdd = getRdd()
    rdd.foreachPartition(partition => {
        partition.foreach(it =>
            val myData = new MyData(it)
            myService.send(myData))
        })
    }
}

MyService 如下所示:

class MyService() extends Serializable {
    def send(data: MyData) = {
        //do something
    }
}

在我的单元测试中,我尝试做这样的事情:

val myServiceMock = mock[MyService]
val data = new MyData()
Sender.handle(myServiceMock)
verify(myserviceMock).send(eqTo(data))

但是当 Spark 将数据从驱动程序传递到执行程序时,它会被序列化,实际上它是新的 MyServiceMock 对象。我得到通缉但没有被调用实际上,与这个模拟的交互为零。

有没有专门的工具可以测试这个案例?

【问题讨论】:

    标签: scala unit-testing mockito apache-spark-2.2


    【解决方案1】:

    这里的问题是如果你想使用一个模拟框架来查看一些方法你必须考虑几件事:

    • 是的,您正在评估在驱动程序中创建的对象实例,因此评估该实例没有意义。您需要评估正在执行程序中创建的实例。

    • 为了在执行程序中进行检查,您需要将验证放入 mapPartitions 函数中。我认为这是不可能的,因为模拟框架不会完全可序列化。

    • 也许有可能将服务模拟实例声明为瞬态。它将在每个执行器中创建一个模拟实例,以便您可以在 ma​​pPartitions 函数中使用 verify 方法。

    【讨论】:

      【解决方案2】:

      我解决了这个问题,如下所述。

      1. 我更改了handle() 方法,该方法现在将分区作为参数。它看起来像这样:
      object Sender {
      
      def handle(myService: MyService, partition: Iterator[MyData]) = {
          partition.foreach(it =>
              val myData = new MyData(it)
              myService.send(myData))
      }
      
      1. 在我的测试方法中,我做了这样的事情:
      import org.mockito.ArgumentMatchersSugar.eqTo
      import org.mockito.Mockito.{mock, verify, withSettings}
      import mypackage.MyService
      
      class SenderTest extends org.scalatest.FunSuite {
          test("send") {
              val testRdd = getTestRdd()
              testRdd.foreachPartition(partition => {
                  val testData = new MyData()
                  val myServiceMock = mock[MyService](classOf[MyService], withSettings.serializable())
                  Sender.handle(myServiceMock, partition)
                  verify(myServiceMock).send(eqTo(testData))
              }
          }
      }
      

      这里的关键是在我创建 mock 时使用 org.mockito.Mockito 中的 withSettings.serializable() 参数以使其可序列化。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-05-19
        • 2016-04-04
        • 2021-02-03
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多