【问题标题】:Apache Storm Flux Kafka Spout Record Translator ExceptionApache Storm Flux Kafka Spout 记录翻译器异常
【发布时间】:2018-09-09 16:21:47
【问题描述】:

我正在使用 Storm Flux 1.2.2 来部署拓扑。我将记录翻译器传递给 KafkaSpoutConfig(引用自 -> https://github.com/apache/storm/blob/master/flux/flux-examples/src/main/resources/kafka_spout.yaml),但出现以下异常:

java.lang.ClassNotFoundException: org.apache.storm.flux.examples.OnlyValueRecordTranslator
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:342)
    at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:421)
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:101)
    at com.gsl.saf.storm.flux.manager.CustomFluxManager.runFlux(CustomFluxManager.java:87)
    at com.gsl.saf.storm.flux.manager.StromTopologyManager.SubmitTopology(StromTopologyManager.java:185)
    at com.gsl.saf.storm.flux.manager.StromTopologyManager.submitTopology(StromTopologyManager.java:299)
    at com.gsl.saf.stormflux.App.main(App.java:36)

Flux yaml 配置:

      components:

  - id: "windowDuration"
    className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
    constructorArgs:
      - 10
      - "SECONDS"
  - id: "onlyValueRecordTranslator"
    className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"

  - id: "spoutConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    constructorArgs:
      - "localhost:9092"
      - ["test_topic"]
    properties:
      - name: "firstPollOffsetStrategy"
        value: EARLIEST
      - name: "recordTranslator"
        ref: "onlyValueRecordTranslator"
    configMethods:
      - name: "setProp"
        args:
          - {
              "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
              "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
            }

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      - ref: "spoutConfigBuilder"
config:
  topology.workers: 1

# spout definitions
spouts:
  - id: "spout-1"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"

POM 配置:

<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.2</version>
            </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>flux-core</artifactId>
            <version>1.2.2</version>
        </dependency>

谢谢

【问题讨论】:

    标签: apache-storm apache-storm-flux


    【解决方案1】:

    OnlyValueRecordTranslator 仅存在于flux-examples 项目中。见https://github.com/apache/storm/blob/master/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java。如果您愿意,可以将其复制到您的项目中。

    【讨论】:

    • 将 OnlyValueRecordTranslator.java 从通量示例复制到我的项目中。调试时我发现 FluxBuilder 类的 findSetter 方法提供 setRecordTranslator(Func, List> func,字段字段),我们期待 setRecordTranslator(RecordTranslator translate),因为抛出了“错误的参数异常”。所以我通过在 configMethods 中添加 recordTraslator 来更改 yaml 文件,它可以工作 configMethods: - name: "setRecordTranslator" args: [ref: "onlyValueRecordTranslator"]
    • 当我使用相同的 yml 时,我正面临“java.lang.IllegalArgumentException:参数类型不匹配”。你能把对你有用的yaml放进去吗?
    猜你喜欢
    • 2015-04-22
    • 2020-05-01
    • 2018-09-19
    • 2019-03-14
    • 2016-11-12
    • 2019-03-08
    • 1970-01-01
    • 1970-01-01
    • 2019-05-04
    相关资源
    最近更新 更多