【问题标题】:How can I append timestamp to rdd and push to elasticsearch如何将时间戳附加到 rdd 并推送到 elasticsearch
【发布时间】:2018-04-07 07:29:17
【问题描述】:

我是 spark 流和弹性搜索的新手,我正在尝试使用 spark 从 kafka 主题中读取数据并将数据存储为 rdd。在 rdd 中,我想在新数据到来时附加时间戳,然后推送到 elasticsearch。

lines.foreachRDD(rdd -> {
        if(!rdd.isEmpty()){
        // rdd.collect().forEach(System.out::println);
        String timeStamp = new 
        SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
        List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
        List<String> f = rdd.collect();


        Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
        Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);

        JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));

        JavaEsSpark.saveToEs(javaRDD, "sample/docs");
        }
    });

【问题讨论】:

    标签: java elasticsearch time apache-kafka spark-streaming


    【解决方案1】:

    如果您使用 Spark Streaming 只是为了以一种更简洁的方式将数据从 Kafka 传输到 Elasticsearch(并且不需要任何编码),那么您应该使用 Kafka Connect。

    有一个Elasticsearch Kafka Connect sink。根据您想要对时间戳执行的操作(例如,用于索引路由或添加为字段),您可以使用 Single Message Transforms(有一个示例 here)。

    【讨论】:

      【解决方案2】:

      火花?

      据我了解,Spark Streaming 用于实时流数据计算,如mapreducejoinwindow。好像没必要用这么强大的工具,我们只需要给事件加个时间戳就好了。

      Logstash?

      如果是这种情况,Logstash 可能更适合我们的情况。

      Logstash 会记录事件到来时的时间戳,它还有persistent queueDead Letter Queues 确保数据弹性。原生支持推送数据到ES(毕竟属于系列产品),推送数据非常方便。

      output {
        elasticsearch {
          hosts => ["localhost:9200"]
          index => "logstash-%{type}-%{+YYYY.MM.dd}"
        }
      }
      

      更多

      • 关于 Logstash 的更多信息,here 是介绍。
      • here 是一个示例 logstash 配置文件。

      希望这有帮助。

      参考

      【讨论】:

        猜你喜欢
        • 2018-07-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-01-31
        • 1970-01-01
        • 1970-01-01
        • 2018-02-28
        • 2015-11-06
        相关资源
        最近更新 更多