【问题标题】:Scala Spark Map DataFrame Missing Paramenter TypeScala Spark Map DataFrame 缺少参数类型
【发布时间】:2017-06-23 22:36:08
【问题描述】:

我是 Spark 的新手,在映射 Dataframe 时收到错误。

我有一个 DStream,我想使用 sql Dataframe 对其进行转换以过滤数据。代码是这样的:

  val textDStream = ssc.textFileStream(inputPath)
  val activityStream = textDStream.transform(input => {
    input.flatMap { line =>
      val record = line.split("\\t")
      Some(Activity(record(0).toLong / MS_IN_HOUR * MS_IN_HOUR, record(1), record(2), record(3), record(4), record(5), record(6)))
      }
  })

activityStream.transform(rdd => {

    val df = rdd.toDF()

    df.registerTempTable("activity")
    val activityByProduct = sqlContext.sql("""SELECT
                                        product,
                                        timestamp_hour,
                                        sum(case when action = 'purchase' then 1 else 0 end) as purchase_count,
                                        sum(case when action = 'add_to_cart' then 1 else 0 end) as add_to_cart_count,
                                        sum(case when action = 'page_view' then 1 else 0 end) as page_view_count
                                        from activity
                                        group by product, timestamp_hour """)

    activityByProduct
      .map { r => ((r.getString(0), r.getLong(1)),
        ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4))
        )}

  }).print()

这里的问题是我收到以下错误:

错误:(58, 18) 缺少参数类型 .map { r => ((r.getString(0), r.getLong(1)),

activityByProduct
  .map { r => ((r.getString(0), r.getLong(1)),
    ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4))
    )}

我看不到类型丢失的地方。我已经尝试明确设置 r => type。但它继续返回错误。

会是什么?

提前致谢

【问题讨论】:

  • 您的.map() 返回什么?如果它是一个元组(元组,?),那么你需要更多的括号。 ActivityByProduct 是什么?
  • 事实上我正在返回一个(键,ActivityByProduct)的元组。其中 key = (product, timestamp_hour) = (r.getString(0), r.getLong(1) 这个key用于有状态操作
  • 我已更改为 activityByProduct .map { r => { val key = (r.getString(0), r.getLong(1)) val activity = ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4)) ActivityFull(key, activity) } } 它返回同样的错误
  • ActivityByProduct 是一个案例类
  • 你试过了:.map[((String,Long),ActivityByProduct)] { r => ....?

标签: scala apache-spark


【解决方案1】:

成功了。

在执行地图之前,我必须将数据帧转换为 rdd:

activityByProduct.rdd
      .map { r =>
        ((r.getString(0), r.getLong(1)),
        ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4))
        )}

查看activityByProduct后面的.rdd

【讨论】:

  • 奇怪。根据 Dataframe API 文档,它应该直接支持 .map 。什么版本的 Spark?
  • 不,这很正常,DataFrame 需要强大的架构定义才能工作。
【解决方案2】:

是的,这行得通。如果它必须工作,您需要将其转换为 rdd。它在以前的 Spark 版本中运行良好,但在 2.12 及更高版本中您将需要它。

【讨论】:

    【解决方案3】:

    您好,我也遇到了同样的问题。 当我将import hiveCtx.implicits._ 代码添加到我的代码val hiveCtx = new HiveContext (sc) 的下一行时,错误已被删除。因为此代码将RDD 隐式转换为DataFrame。 希望能帮到你。

    完整的代码贴在下面希望对你有所帮助。

    package spark.sparkSQL
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.hive.HiveContext
    
    object sparksql2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("sparksql").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
    
        val hiveCtx = new HiveContext(sc)
        import hiveCtx.implicits._        //  ImportType(hiveCtx.implicits)
    
        val input = hiveCtx.jsonFile("./inputFile")
        // Register the input schema RDD
        input.registerTempTable("tweets")
        hiveCtx.cacheTable("tweets")
        // Select tweets based on the retweetCount
        val topTweets = hiveCtx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
        topTweets.collect().map(println(_))
        val topTweetText = topTweets.map(row => row.getString(0))
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2017-08-20
      • 2020-08-04
      • 1970-01-01
      • 2019-07-05
      • 2012-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-04-07
      相关资源
      最近更新 更多