【发布时间】:2019-09-08 13:49:06
【问题描述】:
我正在尝试为我已实现的自定义 MicroBatchReadSupport DataSource 创建一个测试。
为此,我想一次调用一个批处理,它将使用这个 DataSource 读取数据(我已经创建了适当的模拟)。我想调用一个批处理,验证是否读取了正确的数据(当前通过将其保存到内存接收器并检查输出),然后才调用下一个批处理并验证它的输出。
我找不到一种方法来逐个调用每个批次。
如果我使用streamingQuery.processAllAvailable(),批次将一个接一个地调用,不允许我单独验证每个批次的输出。使用trigger(Trigger.Once()) 也无济于事,因为它会执行一批,而我无法继续下一批。
有什么方法可以做我想做的事吗?
目前这是我的基本代码:
val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
val dsw: DataStreamWriter[Row] = dataFrame.writeStream
.format("memory")
.queryName("test_output")
val streamingQuery = dsw
.start()
streamingQuery.processAllAvailable()
【问题讨论】:
标签: apache-spark spark-structured-streaming