【问题标题】:Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?Apache Beam KinesisIO Java 处理管道 - 应用程序状态、错误处理和容错?
【发布时间】:2020-08-21 07:31:11
【问题描述】:

我正在开发我的第一个 Apache 光束管道,以处理来自 AWS Kinesis 的数据流。我熟悉 Kafka 的概念,了解它如何处理消费者的偏移/状态,并且有实施 apachestorm/spark 处理的经验。

阅读完文档后,我成功地创建了一个工作梁管道,使用 KinesisIO Java SDK 监听 AWS Kinesis 数据流以转换和打印消息。但是,想知道有关如何在 apache Beam w.r.t. 中处理以下区域的任何参考实现或指针。 KinesisIO -

  1. 如何在 Kinesis 流中唯一标识消费者应用程序(类似于 Kafka 中的消费者组 ID)- 我说得对吗?它基于 apache Beam 的应用程序名称,并且任何使用 KCL 的消费者都会在 DynamoDB 中跟踪其状态;总是如此吗?对于 apache beam KinesisIO 也是如此?

  2. 如何强制消费者开始处理数据流 w.r.t.它的分片从之前中断的地方开始,即在消费者重新启动或处理过程中出现任何错误异常的情况下(类似于 Kakfa 中的每个消费者 groupId 的偏移量管理)。 InitialPositionInStream.TRIM_HORIZON 始终从最早的可用数据流开始,即使我在处理了来自 Kinesis 流的少量数据后重新启动管道。

  3. ack 如何在 Kinesis 数据流中工作,即消费者如何确认/更新使用 getRecords() 提取的数据流在进一步增加分片中的序列/位置之前处理的检查点?有什么方法可以控制消费者应用程序中的这些行为,即何时成功确认消息以保存应用程序状态并在消费者重新启动时从这些位置开始?

  4. 处理数据流时业务异常(在管道中的任何阶段)对从 Kinesis 流中提取的后续数据的影响,即应用程序是继续提取数据还是停止流程。

【问题讨论】:

    标签: java apache-beam amazon-kinesis apache-beam-io


    【解决方案1】:
    1. KinesisIO.Read 在后台利用 AWS SDK 从 Kinesis 读取数据,并定期检索 Shard Iterator 的更新以从 Kinesis 分片中获取记录。

    2. 你试过ShardIteratorType#LATEST吗?

    3. 在这里查看我的答案:https://stackoverflow.com/a/62349838/10687325

    4. 如果是未知异常,则管道将停止。

    【讨论】:

    • 谢谢!对于#1,它如何在 Shard 中的最后一个读取记录/序列号上维护对 Shard Iterator 的更新。它是在 DynamoDB 中还是在内存中(使用 Direct-runner 和 GCP Dataflow runner 进行了尝试)?对于#2,是的,我已经尝试过了,但我的要求是确保消费者应用程序始终从上次读取位置读取(恢复),以防消费者应用程序关闭一段时间并重新启动。
    猜你喜欢
    • 2020-06-15
    • 2017-08-30
    • 1970-01-01
    • 1970-01-01
    • 2019-06-22
    • 2019-02-02
    • 1970-01-01
    • 1970-01-01
    • 2021-08-13
    相关资源
    最近更新 更多