【问题标题】:Challenges while processing Kafka Messages with Spark Streaming使用 Spark Streaming 处理 Kafka 消息时的挑战
【发布时间】:2017-07-13 10:12:05
【问题描述】:

我想实时处理在网络服务器上报告的消息。 Web 服务器上报告的消息属于不同的会话,我想做一些会话级别的聚合。为此,我计划使用 Kafka 前端的 Spark Streaming。甚至在我开始之前,我已经列出了这个架构将面临的一些挑战。熟悉这个生态系统的人可以帮我解决这些问题吗:

  1. 如果每个 Kafka 消息都属于特定会话,如何管理会话亲缘关系,以便同一个 Spark 执行器看到链接到会话的所有消息?
  2. 如何确保 Spark 执行器按照它们在 Kafka 报告的顺序处理属于会话的消息?我们能否在不限制线程数和产生处理开销(如按消息时间戳排序)的情况下以某种方式实现这一目标?
  3. 何时检查会话状态?如果执行器节点崩溃,如何从最后一个检查点恢复状态?在驱动节点崩溃的情况下,如何从最后一个检查点恢复状态?
  4. 如果节点(执行程序/驱动程序)在检查点状态之前崩溃,如何恢复状态?如果 Spark 通过重播消息重新创建状态 RDD,那么它从哪里开始重播 Kafka 消息:病房上的最后一个检查点,还是它处理重新创建分区所需的所有消息? Spark 流式处理是否可以/是否跨多个 Spark 流式处理批次或仅针对当前批次恢复状态,即如果在最后一批期间未完成检查点,是否可以恢复状态?

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming bigdata


    【解决方案1】:

    如果每个Kafka消息都属于一个特定的会话,如何管理 会话亲和性,以便同一个 Spark 执行器看到所有消息 链接到会话?

    Kafka 将主题划分为分区,每个分区一次只能被一个消费者读取,因此需要确保属于一个会话的所有消息都进入同一个分区。分区分配是通过您分配给每条消息的键来控制的,因此实现这一点的最简单方法可能是在发送数据时使用会话 ID 作为键。这样,同一消费者将获得一个会话的所有消息。 但是有一个警告:当消费者加入或离开消费者组时,Kafka 将重新平衡分配给消费者的分区。如果这发生在会话中间,它可以(并且将会)发生,该会话的一半消息发送给一个消费者,而另一半在重新平衡后发送给另一个消费者。为避免这种情况,您需要手动订阅代码中的特定分区,以便每个处理器都有其特定的分区集并且不会更改这些分区。查看 SparkKafka 组件代码中的 ConsumerStrategies.Assign


    如何确保属于会话的消息由 Spark 执行器按照他们在 Kafka 报告的顺序?我们可以吗 在不限制线程数的情况下以某种方式实现这一点 产生处理开销(如按消息时间戳排序)?

    Kafka 保留每个分区的顺序,因此您无需在这里做太多事情。唯一的事情是避免同时从生产者向代理发送多个请求,您可以通过生产者参数 max.in.flight.requests.per.connection 进行配置。只要您将其保持为 1,如果我正确理解您的设置,您应该是安全的。


    何时检查会话状态?国家如何从最后复活 执行程序节点崩溃时的检查点?国家如何复活 在驱动节点崩溃的情况下从最后一个检查点开始?

    我建议阅读Spark Streaming + Kafka Integration Guide 的偏移存储部分,它应该已经回答了很多问题。

    简短的版本是,您可以将上次读取偏移量保存到 Kafka 中,并且在您检查执行程序时绝对应该这样做。这样,每当一个新的 executor 开始处理时,无论它是否从检查点恢复,它都会知道从 Kafka 的哪里读取。


    如果节点(执行程序/驱动程序)之前崩溃,如何恢复状态 检查其状态?如果 Spark 通过重播重新创建状态 RDD 消息然后它从哪里开始重播 Kafka 消息: 病房的最后一个检查点还是它处理所有需要的消息 重新创建分区? Spark 流式处理是否可以恢复状态 跨多个火花流批次或仅针对当前批次 即如果在期间没有进行检查点,是否可以恢复状态 最后一批?

    我在这里的 Spark 知识有点不稳定,但我想说这不是 Kafka/Spark 完成的事情,而是您需要积极影响您的代码的事情。 默认情况下,如果一个新的 Kafka Stream 启动并且没有找到之前提交的偏移量,它将简单地从主题的末尾开始读取,因此它会得到消费者启动后产生的任何消息。如果您需要恢复状态,那么您要么需要知道要从哪个确切偏移量开始重新阅读消息,要么只是重新从头开始阅读。分配分区时,您可以将要读取的偏移量传递到上述 .Assign() 方法中。

    我希望这会有所帮助,我确信这绝不是对所有问题的完整答案,但这是一个相当广泛的工作领域,如果我能提供进一步的帮助,请告诉我。

    【讨论】:

    • 感谢 Sonke Liebau 的详细解答!那里有一些真正有用的信息。我将落实您的一些建议,并将与您分享我的反馈。
    • Liebau:过去几天我根据您的建议进行了研究。到目前为止,我已经能够得到所有问题的答案,期待一个:即使我能够在同一个 Spark 执行器上获取属于一个分区的所有消息,如何确保它们按顺序处理?每个 spark 执行器可能有多个核心,并且可能会尝试并行处理来自批处理的消息,从而打破有序约束。我不想限制分配给每个执行器的核心数量以实现有序处理。
    猜你喜欢
    • 2017-02-06
    • 1970-01-01
    • 2018-06-25
    • 2017-10-30
    • 2015-02-04
    • 2021-05-22
    • 2020-08-04
    • 2019-06-28
    • 1970-01-01
    相关资源
    最近更新 更多