【问题标题】:Apache Storm JoinBoltApache Storm JoinBolt
【发布时间】:2019-01-28 16:56:23
【问题描述】:

我正在尝试使用带有以下代码 sn-p (http://storm.apache.org/releases/1.1.2/Joins.html) 的 JoinBolt 将两个 kafka 数据流(使用 kafka spouts)合并为一个

它表示每个 JoinBolt 的传入数据流必须在单个字段上分组。一个流只能使用其已被 FieldsGrouped 所在的字段与其他流连接

代码片段:

    KafkaSpout kafka_spout_1 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-1", "/spout-1", "spout-1");//String zkHosts, String topic, String zkRoot, String spoutId

    KafkaSpout kafka_spout_2 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-2", "/spout-2", "spout-2");//String zkHosts, String topic, String zkRoot, String spoutId

    topologyBuilder.setSpout("kafka-spout-1", kafka_spout_1, 1);

    topologyBuilder.setSpout("kafka-spout-2", kafka_spout_2, 1);

    JoinBolt joinBolt = new JoinBolt("kafka-spout-1", "id")
                     .join("kafka-spout-2", "deptId", "kafka-spout-1")             
                     .select("id,deptId,firstName,deptName")
                  .withTumblingWindow(new Duration(10, TimeUnit.SECONDS));

topologyBuilder.setBolt("joiner", joinBolt, 1)
            .fieldsGrouping("spout-1", new Fields("id"))
            .fieldsGrouping("spout-2", new Fields("deptId"));

kafka-spout-1 示例记录 --> {"id" : 1 ,"firstName" : "Alyssa" , "lastName" : "Parker"}

kafka-spout-2 示例记录 --> {"deptId" : 1 ,"deptName" : "Engineering"}

我在使用上面的代码 sn-p 部署拓扑时遇到了以下异常

[main] WARN  o.a.s.StormSubmitter - Topology submission exception: Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"}
java.lang.RuntimeException: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
    at BuildTopology.runTopology(BuildTopology.java:71)
    at Main.main(Main.java:6)
Caused by: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
    at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
    at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
    at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
    at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
    at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
    ... 4 more

如何解决这个问题?

谢谢你,任何帮助将不胜感激

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    如果您正在进行新开发,请考虑使用storm-kafka-client 而不是storm-kafka。 Storm-kafka 已弃用。

    spout 是否真的发出一个名为“deptId”的字段?

    您的配置 sn-p 没有提到您设置了SpoutConfig.scheme,并且您的示例记录似乎暗示您正在发出包含“deptId”字段的 JSON 文档。

    Storm 对 JSON 或从 spout 出来的字符串的内容一无所知。您需要定义一个方案,使 spout 与记录的其余部分分开发出“deptId”字段。

    以下是其中一种内置方案的相关 sn-p,它在单独的字段中发出消息、主题和偏移量:

    @Override
        public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
            String stringMessage = StringScheme.deserializeString(message);
            return new Values(stringMessage, partition.partition, offset);
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
    }
    

    请参阅https://github.com/apache/storm/blob/v1.2.2/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java 以供参考。

    您使用方案执行此操作的另一种方法是在 spout 和 JoinBolt 之间创建一个螺栓,该螺栓从记录中提取“deptId”并将其作为记录旁边的字段发出。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-08-24
      • 2016-02-29
      • 2023-04-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多