【问题标题】:Kafka Storm spout changing topology and consuming from the old offsetKafka Storm spout 更改拓扑并从旧偏移量消费
【发布时间】:2013-12-26 09:39:39
【问题描述】:

我正在使用 kafka spout 来消费消息。但是如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始? Kafka spout 让我们指定从哪里消费的时间戳,但我怎么知道时间戳?

【问题讨论】:

    标签: apache-storm apache-kafka


    【解决方案1】:

    基本上,事件的顺序是:

    1. 第一次启动拓扑,从以下属性开始读取:

      forceFromStart = true
      
      startOffsetTime = -2
      

    上面的道具会强制它从话题的开头开始。请记住同时拥有这两个属性,因为forceFromStart 告诉storm 读取startOffsetTime 属性并使用设置的值来确定从哪里开始读取,并忽略zookeeper 偏移量。

    从现在开始,您的拓扑将运行,zookeeper 将保持偏移量。如果你的工人死了,它将由主管启动,并开始从 zookeeper 中的偏移量读取。

    1. 现在,如果您想重新启动拓扑,并且想从关闭前停止的位置读取,请使用以下属性并重新启动拓扑:

      forceFromStart = false
      

    通过上述属性,您告诉storm 不要读取startOffsetTime 值,而是使用在您关闭拓扑之前已维护的zookeeper 偏移量。

    从现在开始,每次重新启动拓扑时,它都会从原来的位置读取。

    1. 如果您想重新启动拓扑并从主题的头部/顶部读取,请使用以下属性并重新启动拓扑:

      forceFromStart = true
      
      startOffsetTime = -1
      

    通过上述属性,您告诉storm忽略zookeeper偏移并从作为主题提示的最新偏移开始。

    【讨论】:

      【解决方案2】:

      如果您使用的是 KafkaSpout,请确保以下几点:

      1. 在您的 SpoutConfig “id” 和 “zkroot” 之后不要更改 重新部署新版本的拓扑。风暴使用“ zkroot”, “id” 将主题偏移存储到zookeeper中
      2. KafkaConfig.forceFromStart 设置为 false。

      KafkaSpout 将偏移量存储到 Zookeeper 中。如果在 KafkaSpout 的 KafkaConfig 中将 forceFromStart 设置为 true(首次部署拓扑时可能会出现这种情况),则在重新部署期间要非常小心,它将忽略存储的 zookeeper 偏移量。确保将其设置为 false。

      考虑编写拓扑,以便在执行拓扑的 main() 方法时从属性文件中读取 KafkaConfig.forceFromStart 值。这将允许您的管理员控制是否重播 Kafka 消息。

      【讨论】:

        【解决方案3】:

        spoutConfig.forceStartOffsetTime(-1);

        它将选择围绕该时间戳写入的最新偏移量以开始消费。你可以 通过传入 -1 强制喷口始终从最新的偏移量开始,您可以强制 它通过传入-2从最早的偏移量开始。

        references

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-06-25
          • 1970-01-01
          • 1970-01-01
          • 2018-11-05
          • 2023-04-02
          • 1970-01-01
          相关资源
          最近更新 更多