【问题标题】:Mapping field names of the output from Spark-Streaming to Elastic Search将 Spark-Streaming 的输出的字段名称映射到 Elastic Search
【发布时间】:2016-05-19 07:33:21
【问题描述】:

我正在使用以下代码将Spark-Streaming 的输出存储到ElasticSearch。我想将 spark-streaming 的输出映射到正确的名称i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)。但正如你所看到的,它目前被映射到 ES 中,如 _1 或 _2 等。 此外,我想在 ES 中索引数据之前放置一些过滤器,即(if PlatFormName = "ubuntu" then index the data)。那么,我该怎么做呢?

 val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)

            val pageCounts = realTimeAgg.map    
            pageCounts.foreachRDD{ x => 
                    if (x.toLocalIterator.nonEmpty) {       
                        EsSpark.saveToEs(x, "spark/ElasticSearch")
                    }
                }   

            ssc.start()
            ssc.awaitTermination()

ElasticSearch 中的输出:

{
            "_index": "spark",
            "_type": "ElasticSearch",
            "_id": "AVTH0JPgzgtrAOUg77qq",
            "_score": 1,
            "_source": {
               "_1": {
                  "_3": "Amiga",
                  "_2": "AmigaOS 1.3",
                  "_6": "SeaMonkey",
                  "_1": "Usedcar",
                  "_4": 0,
                  "_5": 0
               },
               "_2": 1013
            }
         }

【问题讨论】:

    标签: apache-spark spark-streaming elasticsearch-hadoop


    【解决方案1】:

    弹性搜索文档的键是_1,_2等,因为您存储的是具有(Tuple6,Long)数据类型的PairRDD。

    要保留键,您应该使用案例类作为键。

    val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_)
    

    我假设对象 x 的类是一个案例类,并且您想使用该类的所有字段进行归约(即检查 2 个案例类实例的相等性)。如果该类的所有字段都不是用于相等的类的自然键,那么您有两个选择 -

    1. 为您的案例类覆盖 equals 和 hashCode
    2. 创建另一个仅包含关键字段(您在元组中使用的字段 - (x.key, x.os, x.platform, x.mobile, x.browser))的案例类并映射到该案例第一行lines.map { x => ...}中的类而不是元组。

    您可以在写入 ElasticSearch 之前添加所需的过滤器。

    pageCounts.foreachRDD { x => 
                            if (x.toLocalIterator.nonEmpty) {
                                val y = x.filter(z => z._1.platform == "ubuntu")       
                                EsSpark.saveToEs(y, "spark/ElasticSearch")
                        }
                    }  
    

    PS:如果您使用 (case class, Long) case class 作为键来测试 RDD 对,就像我建议的 lines.map(x => (x, 1)).reduceByKey(_ + _) 一样。有一个与 Spark Shell 相关的错误,即案例类不能正确地作为减少操作的关键类 - jira issue

    【讨论】:

    • 谢谢。我实施了你的第二个建议。能否请您详细说明您在第一个建议中的意思,并以我没有得到的示例为例。 Morover,当您将作业提交到 spark clsuter 时,似乎不会出现此错误,对吧?
    • @Naresh,在第一个选项中,我指的是覆盖现有类中的 equals 和 hashCode 方法(如果需要),例如(此线程建议)[stackoverflow.com/questions/7681183/….是的,该错误仅在 spark-shell 中,而不是在您在集群上运行时。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-07-29
    • 2016-02-05
    • 1970-01-01
    • 2013-08-15
    • 1970-01-01
    • 2012-04-02
    • 2021-04-03
    相关资源
    最近更新 更多