【问题标题】:Spark Structured Streaming - testing one batch at a timeSpark Structured Streaming - 一次测试一批
【发布时间】:2019-09-08 13:49:06
【问题描述】:

我正在尝试为我已实现的自定义 MicroBatchReadSupport DataSource 创建一个测试。

为此,我想一次调用一个批处理,它将使用这个 DataSource 读取数据(我已经创建了适当的模拟)。我想调用一个批处理,验证是否读取了正确的数据(当前通过将其保存到内存接收器并检查输出),然后才调用下一个批处理并验证它的输出。

我找不到一种方法来逐个调用每个批次。 如果我使用streamingQuery.processAllAvailable(),批次将一个接一个地调用,不允许我单独验证每个批次的输出。使用trigger(Trigger.Once()) 也无济于事,因为它会执行一批,而我无法继续下一批。

有什么方法可以做我想做的事吗?

目前这是我的基本代码:

val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
    val dsw: DataStreamWriter[Row] = dataFrame.writeStream
      .format("memory")
      .queryName("test_output")
    val streamingQuery = dsw
      .start()
    streamingQuery.processAllAvailable()

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    我最终做的是使用运行一次的 DataStreamWriter 设置测试,但将当前状态保存到检查点。因此,每次我们调用dsw.start() 时,都会根据检查点从最新的偏移量恢复新批次。我还将数据保存到 globalTempView 中,因此我将能够以与使用内存接收器类似的方式查询数据。为此,我使用foreachBatch(仅从 Spark 2.4 开始可用)。

    这是在代码中:

    val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
    val dsw = getNewDataStreamWriter(dataFrame)
    
    testFirstBatch(dsw)
    testSecondBatch(dsw)
    
    private def getNewDataStreamWriter(dataFrame: DataFrame) = {
        val checkpointTempDir = Files.createTempDirectory("tests").toAbsolutePath.toString
        val dsw: DataStreamWriter[Row] = dataFrame.writeStream
        .trigger(Trigger.Once())
        .option("checkpointLocation", checkpointTempDir)
        .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
            batchDF.createOrReplaceGlobalTempView("input_data")
        }
        dsw
    }
    

    每个批次的实际测试代码(例如testFirstBatch)是:

    val rows = processNextBatch(dsw)
    assertResult(10)(rows.length)
    
    private def processNextBatch(dsw: DataStreamWriter[Row]) = {
        val streamingQuery = dsw
            .start()
        streamingQuery.processAllAvailable()
        sparkSession.sql("select * from global_temp.input_data").collect()
    }
    

    【讨论】:

      猜你喜欢
      • 2021-07-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-19
      • 2019-06-25
      • 1970-01-01
      • 2020-09-12
      • 1970-01-01
      相关资源
      最近更新 更多