【问题标题】:Ordering of Records in Stream流中记录的排序
【发布时间】:2016-07-13 14:36:24
【问题描述】:

以下是我的一些疑问:

我有两个不同的流stream1stream2,其中的元素是按顺序排列的。

1) 现在,当我对这些流中的每一个执行keyBy 时,会保持顺序吗? (因为这里的每个组都只会发送给一个任务管理器) 我的理解是,记录将是为了一组,在这里纠正我。

2) 在两个流上的keyBy 之后,我正在共同组以获取匹配和不匹配的记录。订单也会在这里维护吗?因为这也适用于KeyedStream。 我正在使用EventTimeAscendingTimestampExtractor 生成timestampwatermark

3) 现在我想使用 map/flatmap 对从 2)​​ 得到的 matching_nonMatchingStream 执行序列检查。 我是否需要在此处再次执行keyBy,或者如果我保持链接,matching_nonMatchingStream 是否会在相同的TaskManager 中运行? 我在这里的理解是,链条将在这里工作,纠正我,感到困惑。

4) slotSharingGroup - 你能详细描述一下吗 根据文档:设置此操作的插槽共享组。如果可能,同一槽共享组中的并行操作实例将位于同一TaskManager 槽中。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    关于订购保证

    此页面提供了很好的概述和解释,以及订购保证:https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

    要点是:

    在每个并行流分区内保持顺序。有关流分区的说明,请参见此处:https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

    对于像“keyBy()”或“rebalance()”这样改变分区的操作,每个源流和目标流分区的顺序都保持不变,这意味着每对发送和接收操作符。

    正如 Matthias 所提到的,如果一个组(由一个键定义,在一个接收目标运算符上运行)从多个发送者那里获取元素,则没有明确定义的元素严格排序。使用事件时间等概念,您可以根据数据(附加的时间戳)强加有意义的排序。

    【讨论】:

    • 是的,我正在使用事件时间进行窗口化,但无法理解我在哪里对元素进行排序(我们实际上已经在应用方法中对数据乱序的一个用例进行了排序)现在考虑到我能够对记录进行排序,所以根据文档,如果我只应用一对一的流,所有下游操作员都将按顺序接收元素。只有我做 keyBy 或 rebalance 或其他操作,排序才会生效,对吧?
    【解决方案2】:

    1) 是和不是。 Flink 使用所谓的Watermarks 来跟踪排序。这确保了可以将记录分配给正确的窗口,并且在所有数据可用之前不会关闭窗口。但是,不能保证每个组都有严格的顺序(因为parallel incoming data)。在组之间,根本没有排序保证。

    2) 与 (1) 的答案基本相同。

    3) 您无需再次使用keyBy。默认情况下,map/flatMap 将被链接。

    4) 见https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html#the-processes

    【讨论】:

    • Flink 不提供任何排序​​操作。您需要为此编写 UDF 代码。
    • @matthias-j-sax : 1) 所以如果我想保持每个组的严格顺序,我将不得不执行排序操作,对吗? 2)因此,如果不能保证订单,则使用 AscendingTimestampExtractor 没有意义 3)现在,当水印大于前一个时,会发出水印,这是否意味着我必须维护 current 和 prevTimestamp 并检查 getCurrentWaterMark()方法if(currentTimestamp > prevTimestamp)
    • 您仍然可以使用AscendingTimestampExtractor——您使用的提取器取决于您的用例语义并且独立于订购保证。如果要进行排序,则需要将所有元组缓冲到水印之间。当您收到第二个水印时,您可以对所有缓冲的元组进行排序并以正确的顺序发出。最后,清空缓冲区并从头开始下一个水印水印间隔。水印保证不会再有关于当前水印的迟到的数据。因此,对缓冲区进行排序将给出正确的结果。
    • 我不清楚我应该在哪里缓冲我的元素,你是想说缓冲getCurrentTimestamp()中的元素并对getCurrentWatermark()中的元素进行排序(我在这里很困惑)当您收到第二个水印时,您可以对所有缓冲元组进行排序并以正确的顺序发出 - 没有得到这个,我的理解是我们将对getCurrentWatermark 中的元素进行排序,但我们究竟在哪里发出结果?
    • 我们可以通过电子邮件讨论这个问题吗——它变得越来越复杂了...... mjsax@apache.org(并且评论字段的大小是有限的)。这与您的原始问题不再相关。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-02
    • 1970-01-01
    • 2017-07-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多