【问题标题】:is rdd.contains function in spark-scala expensivespark-scala 中的 rdd.contains 函数是否昂贵
【发布时间】:2018-03-08 12:40:50
【问题描述】:

我在火花流中从 Kafka 流中获得数百万条消息。有 15 种不同类型的消息。消息来自一个主题。我只能通过内容来区分消息。所以我使用 rdd.contains 方法来获取不同类型的 rdd。

示例消息

{"a":"foo", "b":"bar","type":"first" .......}
{"a":"foo1", "b":"bar1","type":"second" .......}
{"a":"foo2", "b":"bar2","type":"第三个".......}
{"a":"foo", "b":"bar","type":"first" .......}
.............
.............
.........
等等

代码

DStream.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    val rdd_first = rdd.filter {
      ele => ele.contains("First")
    }
    if (!rdd_first.isEmpty()) {
      insertIntoTableFirst(hivecontext.read.json(rdd_first)) 
    }
    val rdd_second = rdd.filter {
      ele => ele.contains("Second")
    }
    if (!rdd_second.isEmpty()) {
     insertIntoTableSecond(hivecontext.read.json(rdd_second))
    }
         .............
         ......
    same way for 15 different rdd

有没有办法从 kafka 主题消息中获取不同的 rdd?

【问题讨论】:

    标签: scala apache-kafka spark-streaming contains


    【解决方案1】:

    没有rdd.contains。这里使用的函数contains应用于RDD中的Strings。

    喜欢这里:

    val rdd_first = rdd.filter {
      element => element.contains("First") // each `element` is a String 
    }
    

    此方法不健壮,因为String中的其他内容可能满足比较,导致错误。

    例如

    {"a":"foo", "b":"bar","type":"second", "c": "first", .......}
    

    解决此问题的一种方法是首先将 JSON 数据转换为适当的记录,然后对这些记录应用分组或过滤逻辑。为此,我们首先需要数据的模式定义。使用模式,我们可以将记录解析为 json 并在此之上应用任何处理:

    case class Record(a:String, b:String, `type`:String)
    
    import org.apache.spark.sql.types._
    val schema = StructType(
                   Array(
                    StructField("a", StringType, true),
                    StructField("b", StringType, true),
                    StructField("type", String, true)
                   )
                 )
    
    val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...) 
    
    stream.foreachRDD { rdd =>
      val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record]
      processPerType.foreach{case (tpe, process) =>
          val target = records.filter(entry => entry.`type` == tpe)
          process(target)
      }
    } 
    

    这个问题没有具体说明需要对每种类型的记录应用什么样的逻辑。这里介绍的是一种解决问题的通用方法,其中任何自定义逻辑都可以表示为函数Dataset[Record] => Unit

    如果逻辑可以表示为聚合,那么Dataset 聚合函数可能会更合适。

    【讨论】:

    • 我必须将数据存储在配置单元中。在 hive 中创建了 15 个不同的表。更新的问题。实际上,单一类型的 JSON 中有超过 50 列。所以我必须创建 15 个案例类。除了创建案例类之外,还有其他方法吗??
    • @KishoreKumarSuthar 在使用初始 case class(按照 Spark 术语)“结构化”数据后,您可以对数据进行预测以匹配特定表 (val tableProjection1 = records select($"column", $"column", ...) where ($"type" === ...)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-13
    • 2012-03-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多