【问题标题】:How to determine topic for a Kafka message如何确定 Kafka 消息的主题
【发布时间】:2018-02-04 05:55:20
【问题描述】:

我有如下代码可以从 Kafka 读取并保存到 Elasticsearch,我正在使用 spark 流:

JavaDStream<String> liness = messages.map(new Function<ConsumerRecord<String, String>, String>() {
        @Override
        public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
            System.out.println(kafkaRecord.topic() + kafkaRecord.value());
            return kafkaRecord.value();
        }
    });

        JavaEsSparkStreaming.saveJsonToEs(lines, "events/redict");
        jssc.start();

我的问题是我需要根据主题名称使 "events/redict" 动态化,我也可以在 messages.map 调用中获取主题名称但是如何在此处添加动态?

【问题讨论】:

  • 如何从消费者记录映射到一个由两个字段组成的新对象,值的字符串和取决于主题的值?
  • 如果您只想将 Kakfa 记录保存到 Elastic,为什么不使用 Logstash 或 Kafka Connect?
  • 谢谢,板球我更喜欢 Spark。
  • 是的,Dominik,这是一种方法,但我正在寻找一种我必须更改消息的方法。
  • 所以您宁愿编程并将其部署到某种类型的 Spark 集群,然后使用内置于 ELK 堆栈或 Kafka 中的工具?

标签: java apache-spark apache-kafka spark-streaming


【解决方案1】:

您可以通过为每个主题创建一个流,或将您的 messages 流过滤到 eventsredict RDD 中来添加“动态”。

您将这两个单独保存到适当的 ES 索引中。

【讨论】:

    猜你喜欢
    • 2018-07-18
    • 2021-08-07
    • 2017-05-29
    • 1970-01-01
    • 1970-01-01
    • 2019-06-23
    • 1970-01-01
    • 2015-12-07
    • 1970-01-01
    相关资源
    最近更新 更多