【问题标题】:get topic from kafka message in spark从火花中的kafka消息中获取主题
【发布时间】:2016-08-14 17:11:17
【问题描述】:

在我们的 spark-streaming 工作中,我们从 kafka 读取流式消息。

为此,我们使用返回JavaPairInputDStreamfromKafkaUtils.createDirectStream API。

通过以下方式从 kafka 读取消息(来自三个主题 - test1、test2、test3):

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每条消息的主题名称。

所以我们执行以下操作:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

这是SplitToLinesFunction的实现:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

问题在于tuple2._1 为空,我们假设tuple2._1 将包含一些元数据,例如消息来自的主题/分区的名称。

但是,当我们打印tuple2._1 时,它是空的。

我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 spark-streaming 代码中,tuple2._1 将包含它(而不是 null)?

请注意,我们还尝试从 spark-streaming kafka-integration tutorial 中提到的 DStream 中获取主题名称:

但它返回发送到KafkaUtils.createDirectStream 的所有主题,而不是消息(属于当前RDD)来自的特定主题。

所以它并不能帮助我们识别 RDD 中的消息是从哪里发送的主题的名称。

编辑

响应大卫的回答 - 我尝试像这样使用MessageAndMetadata

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

问题是MessageAndMetadataFunction.call 方法中没有打印任何内容。为了在 MessageAndMetadataFunction.call 方法中获取该 RDD 的相关主题,我应该修复什么?

【问题讨论】:

  • “这里什么都没有打印”是什么意思?甚至没有“topic =”部分,或者该部分打印但值为空。
  • 如果没有,那么您应该查看您的 YARN 日志,或者您正在运行的任何集群。对我来说,/usr/local/hadoop/logs/userLogs/ 中有日志文件从您的执行者那里捕获 stdout
  • 抱歉 -- 我现在知道问题所在了。这是因为您的MessageAndMetadataFunction 必须将主题和消息都返回到一条记录中。现在您只返回主题,而不是消息本身。这就是为什么您一遍又一遍地打印出主题的原因——因为这就是你从MessageAndMetadataFunction 返回的内容——返回两者,你将拥有两者。
  • 编辑了我的答案以使其更清晰
  • 但是v1.topic()v1.partition 怎么可以一起返回呢?创建一个new Tuple2&lt;String, String&gt;(v1.topic(), v1.partition()) 并返回它?

标签: apache-spark apache-kafka spark-streaming


【解决方案1】:

使用将messageHandler 函数作为参数的createDirectStream 版本之一。这是我的工作:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
  ssc,
  kafkaParams,
  getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
  (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)

那里有些东西对你没有任何意义——相关的部分是

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}

如果您不熟悉Scala,函数所做的只是返回一个包含msg.topicmsg.messageTuple2。您的函数需要返回这两个,以便您在下游使用它们。您可以只返回整个 MessageAndMetadata 对象,这会为您提供一些其他有趣的字段。但是如果你只想要topicmessage,那么就使用上面的。

【讨论】:

  • 嘿,你好像有一个额外的牙套,请你纠正一下。
  • @David 您能否提供一个使用 Scala 的工作或详细示例。因为我对来自Offsets,messageHandler的这些参数感到困惑。谢谢你!
【解决方案2】:

Kafka integration guide 底部有一个从消息中提取主题的示例。

Java中的相关代码:

 // Hold a reference to the current offset ranges, so it can be used downstream
 final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();

 directKafkaStream.transformToPair(
   new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
     @Override
     public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
       OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
       offsetRanges.set(offsets);
       return rdd;
     }
   }
 ).map(
   ...
 ).foreachRDD(
   new Function<JavaPairRDD<String, String>, Void>() {
     @Override
     public Void call(JavaPairRDD<String, String> rdd) throws IOException {
       for (OffsetRange o : offsetRanges.get()) {
         System.out.println(
           o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
         );
       }
       ...
       return null;
     }
   }
 );

这可能会被折叠成更紧凑的内容,只需要主题而不是其他内容。

【讨论】:

  • 我试了一下,它打印了kafka监听的所有topic,而不是只打印与当前RDD相关的topic。例如- 如果我收听 3 个主题 - test1、test2、test3 - 并且消息仅来自 test1,那么此代码将为每个 RDD 打印 test1、test2、test3。所以这段代码对我没有帮助。
猜你喜欢
  • 1970-01-01
  • 2015-08-01
  • 1970-01-01
  • 2017-03-27
  • 2018-02-12
  • 2019-02-10
  • 2019-09-20
  • 2019-10-11
  • 2021-10-18
相关资源
最近更新 更多