【问题标题】:Make apache storm topology to use latest offset from kafka制作 Apache Storm 拓扑以使用来自 kafka 的最新偏移量
【发布时间】:2018-11-05 14:43:46
【问题描述】:

我有一个kafkaspout,2个bolts处理数据,2个bolts将处理后的数据存储在mongodb中

我正在使用 apache Flux 创建拓扑,我正在将数据从 kafka 读取到 spout 中。一切运行良好,但每次我运行拓扑时,它都会从一开始就处理 kafka 中的所有 msg。 并且一旦处理完所有的消息,它就不会等待更多的消息和崩溃。

我怎样才能让风暴拓扑只处理最新的消息。

这是我的拓扑文件 .yaml

name: "kafka-topology"

components:
# MongoDB mapper
  - id: "block-mapper"
    className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper"
    configMethods:
      - name: "withFields"
        args: # The following are the tuple fields to map to a MongoDB document
          - ["block"]
# MongoDB mapper
  - id: "transaction-mapper"
    className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper"
    configMethods:
      - name: "withFields"
        args: # The following are the tuple fields to map to a MongoDB document
          - ["transaction"]

  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "172.25.33.191:2181"

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "blockdata"
      # zkRoot
      - ""
      # id
      - "myId"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"
      - name: "ignoreZkOffsets"
        value: flase



config:
  topology.workers: 1
  # ...

# spout definitions
spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"
    parallelism: 1

# bolt definitions
bolts:
  - id: "blockprocessing-bolt"
    className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
    constructorArgs:
      # command line
      - ["python", "process-bolt.py"]
      # output fields
      - ["block"]
    parallelism: 1
    # ...
  - id: "transprocessing-bolt"
    className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
    constructorArgs:
      # command line
      - ["python", "trans-bolt.py"]
      # output fields
      - ["transaction"]
    parallelism: 1
    # ...

  - id: "mongoBlock-bolt"
    className: "org.apache.storm.mongodb.bolt.MongoInsertBolt"
    constructorArgs:
      - "mongodb://172.25.33.205:27017/testdb"
      - "block"
      - ref: "block-mapper"
    parallelism: 1
    # ...

  - id: "mongoTrans-bolt"
    className: "org.apache.storm.mongodb.bolt.MongoInsertBolt"
    constructorArgs:
      - "mongodb://172.25.33.205:27017/testdb"
      - "transaction"
      - ref: "transaction-mapper"
    parallelism: 1
    # ...



  - id: "log"
    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
    parallelism: 1
    # ...

#stream definitions
# stream definitions define connections between spouts and bolts.
# note that such connections can be cyclical
# custom stream groupings are also supported

streams:


  - name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "blockprocessing-bolt"
    grouping:
      type: SHUFFLE

  - name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "transprocessing-bolt"
    grouping:
      type: SHUFFLE

  - name: "block --> mongo"
    from: "blockprocessing-bolt"
    to: "mongoBlock-bolt"
    grouping:
      type: SHUFFLE

  - name: "transaction --> mongo"
    from: "transprocessing-bolt"
    to: "mongoTrans-bolt"
    grouping:
      type: SHUFFLE

我已尝试将属性添加到 spoutconfig 以仅像这样获取最新消息

 - id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
  - ref: "zkHosts"
  - "blockdata"
  - ""
  - "myId"
properties:
  - name: "scheme"
    ref: "stringMultiScheme"
  - name: "startOffsetTime"
    ref: "EarliestTime"

  - name: "forceFromStart"
    value: false

但无论我在 startOffsetTime 的参考中放置什么,它都会产生 错误

Exception in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    需要将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime。正如您在https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig 看到的那样,默认设置将转到最早可用的偏移量。

    您遇到的异常似乎无关。它看起来像策展人/动物园管理员不兼容。

    编辑:我认为您遇到了这个问题https://issues.apache.org/jira/browse/STORM-2978。 1.2.2应该快出来了,发布后请尝试升级。

    编辑编辑:如果您想在不升级的情况下解决它,请为您的拓扑编辑 pom,使其包含对 Zookeeper 3.4 而不是 3.5 的依赖项。

    【讨论】:

    • 谢谢斯蒂格。我正在使用 yaml 文件创建拓扑,如何在 yaml 文件中将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime?我已经搜索了几个小时,但无法绕过它。
    • 我对 spoutConfig 进行了编辑,无法在 yaml 拓扑中设置 startOffsetTime 属性。请查看更新的非常感谢您的帮助:)
    猜你喜欢
    • 2019-06-25
    • 2013-12-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-02
    • 2016-08-11
    • 2019-08-04
    相关资源
    最近更新 更多