【问题标题】:spark streaming pick latest event for every record per trigger process interval火花流为每个触发过程间隔的每条记录选择最新事件
【发布时间】:2021-04-20 05:09:28
【问题描述】:

我们有一个 spark 流式处理(spark 版本 2.4.0)作业,它使用一个 Kafka 主题(4 个分区),其中包括业务更改为带有 ID 的 json。 这些 Kafka 值还包括 RecordTime 字段和 json 对象内的其他字段。 此流式作业根据 Id 字段更新 Kudu 表。

一段时间后,我们注意到,某些更新实际上并未反映某些 id 字段值的最新状态。 我们假设每个分区有 4 个不同的执行程序处理,当其中一个比另一个更早完成时,它会更新目标 Kudu 表。 所以如果我们有如下值:

(Id=1, val=A, RecordTime: 10:00:05 ) partition1
(Id=2, val=A, RecordTime: 10:00:04 ) partition1
(Id=1, val=B, RecordTime: 10:00:07 ) partition2
(Id=1, val=C, RecordTime: 10:00:06 ) partition3
(Id=2, val=D, RecordTime: 10:00:05 ) partition1
(Id=2, val=C, RecordTime: 10:00:06 ) partition4
(Id=1, val=E, RecordTime: 10:00:03 ) partition4

那么 Kudu 表应该是这样的:

Id Value RecordTime
1 B 10:00:07
2 C 10:00:06

但是,有时我们会看到这样的 Kudu 表:

Id Value RecordTime
1 A 10:00:05
2 C 10:00:06

触发间隔为 1 分钟。

那么,如何实现目标Kudu表的有序更新呢。

  1. 我们是否应该使用单个分区进行排序,但如果这样做利弊?
  2. 对于 Spark 流,我们如何在每个触发间隔选择最新记录和值
  3. 根据 id 和 RecordTime 更新 kudu 表,但如何?
  4. 还有其他我们可以考虑的方法吗?

希望我能充分解释我的问题。 简而言之,我们如何在 Spark Streaming 中实现每个微批次间隔的事件排序?

特别感谢任何可以帮助我的人。

【问题讨论】:

    标签: apache-spark events apache-kafka streaming kudu


    【解决方案1】:

    当您从 Kafka 获取数据时,记住 Kafka 仅在主题 partition 内提供排序保证是很有用的。

    因此,如果您让 Kafka 生产者将同一 ID 的所有消息生成到同一分区中,您就可以解决您的问题。这可以通过 KafkaProducer 中的自定义分区器来实现,或者您只需使用 id 的值作为 Kafka 消息的“关键”部分。

    如果您无法控制 Kafka 生产者,则需要使您的 Spark Streaming 作业有状态。在这里,具有挑战性的部分是定义一个时间范围,您的工作应该等待具有相同 id 的其他消息到达多长时间。只是几秒钟吗?也许几个小时?我的经验是,这可能很难回答,有时答案是“几个小时”,这意味着您需要将状态保持几个小时,这可能会使您的工作 OutOfMemory。

    【讨论】:

    • 谢谢迈克。将使用 id 的值作为“键”
    猜你喜欢
    • 2018-11-28
    • 2012-12-15
    • 2021-01-23
    • 2015-08-21
    • 2020-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多