【问题标题】:Spark Scala - compile errorsSpark Scala - 编译错误
【发布时间】:2021-03-21 11:10:10
【问题描述】:

我在 scala 中有一个脚本,当我在 Zeppelin 中运行它时效果很好,但是当我尝试使用 sbt 编译时,它不起作用。我相信与版本有关,但我无法识别。

这三种方式返回相同的错误:

val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()

返回错误:“value rdd is not a member of Unit”

val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF

返回错误:“值 toDF 不是 org.apache.spark.rdd.RDD[U] 的成员”

Scala 版本:2.12 Sbt 版本:1.3.13

更新: 整个班级是: 包导入器

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql._

import org.apache.spark.sql.functions._
import udf.functions._

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column

object BusinessImporter extends Importer{

    def importa(spark: SparkSession, inputDir: String): Unit = {
        
        import spark.implicits._
        val bizDF = spark.read.json(inputDir).cache

        // categories
        val explode_categories = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
        val sort_categories = explode_categories.select(col("categories").as("description"))
                .distinct
                .coalesce(1)
                .orderBy(asc("categories"))
        // Create sequence column
        val windowSpec = Window.orderBy("description")
        val categories_with_sequence = sort_categories.withColumn("id",row_number.over(windowSpec))
        val categories = categories_with_sequence.select("id","description")

        val catDF = categories.write.insertInto("categories")

        // business categories
        //val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
        //val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
        val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()
        val auxbizCatRDD = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
        val bizCatRDD = auxbizCatRDD.select("business_id","categories")
        val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF
        bizCat.write.insertInto("business_category")

        // Business
        val businessDF = bizDF.select("business_id","categories","city","address","latitude","longitude","name","is_open","review_count","stars","state")
        businessDF.coalesce(1).write.insertInto("business")

        // Hours
        val bizHoursDF = bizDF.select("business_id","hours.Sunday","hours.Monday","hours.Tuesday","hours.Wednesday","hours.Thursday","hours.Friday","hours.Saturday")

        val bizHoursDF_structs = bizHoursDF
            .withColumn("Sunday",struct(
            split(col("Sunday"),"-").getItem(0).as("Open"),
            split(col("Sunday"),"-").getItem(1).as("Close")))
            .withColumn("Monday",struct(
            split(col("Monday"),"-").getItem(0).as("Open"),
            split(col("Monday"),"-").getItem(1).as("Close")))
            .withColumn("Tuesday",struct(
            split(col("Tuesday"),"-").getItem(0).as("Open"),
            split(col("Tuesday"),"-").getItem(1).as("Close")))
            .withColumn("Wednesday",struct(
            split(col("Wednesday"),"-").getItem(0).as("Open"),
            split(col("Wednesday"),"-").getItem(1).as("Close")))
            .withColumn("Thursday",struct(
            split(col("Thursday"),"-").getItem(0).as("Open"),
            split(col("Thursday"),"-").getItem(1).as("Close")))
            .withColumn("Friday",struct(
            split(col("Friday"),"-").getItem(0).as("Open"),
            split(col("Friday"),"-").getItem(1).as("Close")))
            .withColumn("Saturday",struct(
            split(col("Saturday"),"-").getItem(0).as("Open"),
            split(col("Saturday"),"-").getItem(1).as("Close")))

        bizHoursDF_structs.coalesce(1).write.insertInto("business_hour")
        
    }

    def singleSpace(col: Column): Column = {
        trim(regexp_replace(col, " +", " "))
    }
}

sbt 文件:

name := "yelp-spark-processor"
version := "1.0"
scalaVersion := "2.12.12"

libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12"  % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-hive_2.12" % "3.0.1"

有人可以给我一些关于哪里出了问题的方向吗?

非常感谢 哈维

【问题讨论】:

  • 你是如何创建bizCatRDD 的?
  • 你的构建文件是什么样的?
  • 大家好,我更新了帖子以显示整个班级和 sbt 文件。非常感谢 Xavy

标签: scala apache-spark sbt apache-zeppelin


【解决方案1】:

这里的问题是,在 scala 中,这一行返回单位类型:

val catDF = categories.write.insertInto("categories")

scala 中的单元就像 java 中的 void,它由不返回任何有意义的函数返回。所以基本上此时 catDF 不是数据框,你不能这样对待它。因此,您可能希望在接下来的行中继续使用 categories 而不是 catDF

【讨论】:

  • 非常感谢。完美地工作并解决了我的问题。我正在使用 Scala 和 Spark 做一些小步骤。所以基本的问题/问题有时会发生
  • 很高兴听到这个消息,很乐意为您提供帮助。您可能想研究合适的 Scala IDE:像 IntelliJ IDEA,它们会在此类问题上为您提供很多帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-11-12
  • 1970-01-01
  • 1970-01-01
  • 2018-02-01
  • 2016-01-13
相关资源
最近更新 更多