【问题标题】:String filter using Spark UDF使用 Spark UDF 的字符串过滤器
【发布时间】:2017-04-20 01:12:28
【问题描述】:

输入.csv:

200,300,889,767,9908,7768,9090

300,400,223,4456,3214,6675,333

234,567,890

123,445,667,887

我想要的: 读取输入文件并与集合“123,200,300”进行比较,如果找到匹配,则给出匹配数据 200,300(来自 1 个输入行)

300(来自 2 个输入行)

123(来自 4 个输入行)

我写的:

  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark.rdd.RDD

  object sparkApp {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("CountingSheep")
    val sc = new SparkContext(conf)

    def parseLine(invCol: String) : RDD[String]  = {
      println(s"INPUT, $invCol")
      val inv_rdd = sc.parallelize(Seq(invCol.toString))
      val bs_meta_rdd = sc.parallelize(Seq("123,200,300"))
      return inv_rdd.intersection(bs_meta_rdd)
    }

    def main(args: Array[String]) {
      val filePathName = "hdfs://xxx/tmp/input.csv"
      val rawData = sc.textFile(filePathName)
      val datad = rawData.map{r => parseLine(r)}
    }
  }

我得到以下异常:

java.lang.NullPointerException

请指出我哪里出错了

【问题讨论】:

  • 你是如何运行你的程序的?你的jar已经编译打包了吗?
  • 是的,我编译并打包了jar :: spark-submit --class "sparkApp" --master local --num-executors 2 --driver-memory 1g --executor-memory 1g --执行器核心 1 /home/spark_app/sparkApp/target/scala-2.10/sparkapp_2.10-1.0.jar
  • 当我运行代码时,我得到了异常
  • 啊,是的,现在查看代码!然而@giaosudau 说代码已编译并运行!
  • 我们如何实现什么?您的代码没有多大意义。你愿意解释一下你想用你的代码做什么吗?

标签: scala apache-spark


【解决方案1】:

问题解决了。这很简单。

val pfile = sc.textFile("/FileStore/tables/6mjxi2uz1492576337920/input.csv")
case class pSchema(id: Int, pName: String)
val pDF = pfile.map(_.split("\t")).map(p => pSchema(p(0).toInt,p(1).trim())).toDF()
pDF.select("id","pName").show()

定义UDF

val findP = udf((id: Int,
                    pName: String
                    ) => {
  val ids = Array("123","200","300")
  var idsFound : String = ""
  for (id  <- ids){
    if (pName.contains(id)){
      idsFound = idsFound + id + ","
    }
  }
  if (idsFound.length() > 0) {
    idsFound = idsFound.substring(0,idsFound.length -1)
  }    
 idsFound
})

在 withCoulmn() 中使用 UDF

pDF.select("id","pName").withColumn("Found",findP($"id",$"pName")).show()

【讨论】:

    【解决方案2】:

    为了简单的回答,为什么我们把它弄得这么复杂?在这种情况下,我们不需要 UDF。

    这是您的输入数据:

    200,300,889,767,9908,7768,9090|AAA
    300,400,223,4456,3214,6675,333|BBB
    234,567,890|CCC
    123,445,667,887|DDD
    

    你必须将它与123,200,300匹配

    val matchSet = "123,200,300".split(",").toSet
    val rawrdd = sc.textFile("D:\\input.txt")
    rawrdd.map(_.split("|"))
          .map(arr => arr(0).split(",").toSet.intersect(matchSet).mkString(",") + "|" + arr(1))
          .foreach(println)
    

    你的输出:

    300,200|AAA
    300|BBB
    |CCC
    123|DDD
    

    【讨论】:

    • 你用 "," 200,300,889,767,9908,7768,9090 分割输入行。根据要求,这是一个单列。不要拆分它。如果要拆分,请使用 \t
    • EI 知道@Manish,这是一个单列。请尝试理解我的代码。我只是在性能方面简明你的代码,因为你知道用户定义的函数在 spark 中很昂贵。尝试运行此代码。希望,它会解决您的问题。
    • 嗨@ManishSaraf,我刚刚更新了这段代码。如果您有任何问题,请检查并告诉我。
    【解决方案3】:

    您尝试做的事情无法按照您的方式完成。

    Spark 不支持嵌套 RDD(请参阅 SPARK-5063)。

    Spark 不支持嵌套 RDD 或在转换中执行 Spark 操作;这通常会导致 NullPointerExceptions(参见 SPARK-718 作为一个示例)。令人困惑的 NPE 是 StackOverflow 上最常见的 Spark 问题来源之一:

    我认为我们可以通过向 RDD 添加逻辑来检查 sc 是否为空(例如,将 sc 转换为 getter 函数)来检测这些错误;我们可以使用它来添加更好的错误消息。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-01-09
      • 2016-12-15
      • 1970-01-01
      • 2020-05-16
      • 2018-06-12
      • 2018-05-16
      • 1970-01-01
      相关资源
      最近更新 更多