【问题标题】:Storm Kafka Spout Unable to read last off readStorm Kafka Spout 无法读取最后一次读取
【发布时间】:2015-07-30 21:53:40
【问题描述】:

我正在使用storm-kafka-0.9.3从Kafka读取数据并在Storm中处理这些数据。下面是我正在使用的 Kafka Spout。但问题是当我杀死 Storm 集群时,它不会读取在它死亡期间发送的旧数据,它会从最新的偏移量开始读取。

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);

SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
        , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Never should make this true
spoutConfig.forceFromStart=false;
spoutConfig.startOffsetTime =-2;

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;

【问题讨论】:

  • 你能不能试着注释掉spoutConfig.forceFromStart=false;行或设置spoutConfig.forceFromStart=true
  • 试过了,但同样的问题,实际看到假设我在 kafka 中有 100 条消息,Storm 正在处理它,现在假设在第 100 条消息之后,Storm 关闭并且我的 http 端点在 Kafka 中推送了 300 条消息,因为 Storm只处理了 100 条消息,我希望当 Storm 醒来时它应该从它离开的 101 条消息开始处理。
  • 那么到底发生了什么?在您的帖子中,您提到它从最新的偏移量开始读取..这不是您要找的吗?
  • 基本上,当 strom 回来时,它会从 401 而不是 101 开始读取。

标签: apache-kafka apache-storm


【解决方案1】:

谢谢大家, 由于我在Local模式下运行Topology,Storm没有在ZK中存储Offset,当我在Prod模式下运行Topology时,它得到了解决。

苏形

【讨论】:

  • 那么哪些配置可以帮助您解决问题?
【解决方案2】:

你需要设置 spoutConfig.zkServers 和 spoutConfig.zkPort :

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);
SpoutConfig spoutConfig = new SpoutConfig(hosts,  CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
    , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,"test");

spoutConfig.zkPort=Constants.ZOOKEEPER_PORT;  
spoutConfig.zkServers=Constants.ZOOKEEPER_SERVERS;

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;

【讨论】:

    【解决方案3】:

    我相信这可能会发生,因为在拓扑运行时,它使用以下路径 SpoutConfig.zkRoot+ "/" + SpoutConfig.id 将所有状态信息保存到 zookeeper,以便在失败的情况下它可以从 zookeeper 中最后写入的偏移量恢复。

    从文档中得到这个

    重要:在重新部署拓扑时,请确保 SpoutConfig.zkRoot 和 SpoutConfig.id 的设置没有被修改,否则 spout 将无法读取其之前的消费者状态信息(即偏移量)来自 ZooKeeper - 这可能会导致意外行为和/或数据丢失,具体取决于您的用例。

    在您的情况下,SpoutConfig.id 是一个随机值 UUID.randomUUID().toString() 它无法检索最后提交的偏移量。

    同样从同一页面读取

    当拓扑运行一次时,设置 KafkaConfig.startOffsetTime 将不会影响拓扑的后续运行,因为现在拓扑将依赖 ZooKeeper 中的消费者状态信息(偏移量)来确定应该从哪里开始(更准确地说:继续)阅读。如果要强制 spout 忽略 ZooKeeper 中存储的任何消费者状态信息,则应将参数 KafkaConfig.ignoreZkOffsets 设置为 true。如果为 true,则 spout 将始终从 KafkaConfig.startOffsetTime 定义的偏移量开始读取,如上所述

    您可以使用静态id 来查看它是否能够检索。

    【讨论】:

    • 感谢您的回复。仍然无法按预期工作 SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,"Test");spoutConfig.forceFromStart=true;
    • 据我了解,您正在这样做 - 1)启动 kafka-spout 以读取 100 条消息(2)终止拓扑(3)在队列中再推送 100 条消息(4)再次对拓扑进行分层.. 它从 201 偏移量开始读取 .. 这是正确的吗?
    • 是的,它从 201 开始,但我希望它应该从 101 开始,因为它没有处理最后 100 条消息。顺便说一句,我用过三叉戟也是同样的问题。它总是从 201 开始读取。
    猜你喜欢
    • 1970-01-01
    • 2019-10-11
    • 2015-04-20
    • 2014-07-01
    • 1970-01-01
    • 2016-04-17
    • 2018-06-24
    • 1970-01-01
    • 2021-04-25
    相关资源
    最近更新 更多