【发布时间】:2019-02-20 08:08:03
【问题描述】:
我正在尝试对涉及 DStreams 的 Spark Streaming 应用程序执行一些单元测试。
我发现以下套装非常有用:StreamingSuiteBase。它包含一个名为 testOperation 的方法,您可以将输入、要测试的操作和预期输出传递给该方法。它将验证您的预期输出是否与实际输出匹配。
我面临的问题是在相等性验证期间,我确实得到了完全相同的对象,但包装到不同的集合中:
- 预期:列表(myObject)
- 实际:数组(myObject)
testOperation定义如下:
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
ordered: Boolean
) (implicit equality: Equality[V]): Unit = {
val numBatches = input.size
withOutputAndStreamingContext(setupStreams[U, V](input, operation)) {
(outputStream, ssc) =>
val output: Seq[Seq[V]] = runStreams[V](
outputStream, ssc, numBatches, expectedOutput.size)
verifyOutput[V](output, expectedOutput, ordered)
}
}
这不允许我按预期使用输入 List(Array(myObject))
所以我的第二个选择是修改方法verifyOutput。我打算从我的代码中覆盖它,只需添加几行来生成 List(Array(myObject))。像这样(更新):
override def verifyOutput[V](output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
ordered: Boolean)
(implicit evidence$1: ClassTag[V], equality: Equality[V]): Unit = {
super.verifyOutput(output, expectedOutput, ordered)
}
//These three lines is what I am planning to add
val sq = expectedOutput(0)
val ssq = sq(0)
val newOutput = Seq(Array(ssq))
logInfo("--------------------------------")
logInfo("output.size = " + output.size)
logInfo("output")
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Match the output with the expected output
assert(output.size === expectedOutput.size, "Number of outputs do not match")
if (ordered) {
for (i <- output.indices)
equalsOrdered(output(i), expectedOutput(i))
} else {
for (i <- output.indices)
equalsUnordered(output(i), expectedOutput(i))
}
logInfo("Output verified successfully")
}
整个StreamingSuiteBase可以找到here
但我在 Eclipse 中收到以下错误:
方法 verifyOutput 不会覆盖任何内容。注意:超类 类 myClass 包含以下非最终成员,名为 verifyOutput: def verifyOutput[V](输出: Seq[Seq[V]],expectedOutput: Seq[Seq[V]],ordered: Boolean)(隐式证据$1: scala.reflect.ClassTag[V],隐式相等: org.scalactic.Equality[V]): 单位
这是我的测试用例的简化版本:
import org.scalatest.FunSuite
class myClass extends StreamingSuiteBase with FunSuite {
test("ExtCustProfileHbaseAPI") {
//Here I would be generating my input and expected output
val inputData = new myInitialObject()
val expOutput = new myFinalObject()
testOperation(inputData, processTest _, expOutput, ordered = false)
}
def processTest(input: DStream[myInitialObject]): DStream[(String,myFinalObject)] = {
//Operation undertes
val result = operation(input)
result
}
//Here I added the override def verifyOutput[V: ClassTag]...
}
我做错了什么?
【问题讨论】:
-
你扩展了 trait StreamingSuiteBase 吗?你能给出你的测试类的定义吗?
-
@AlexeyNovakov 我添加了测试类的定义
标签: scala apache-spark scalatest