【问题标题】:Load CSV File in BigQuery with Dataproc (Spark)使用 Dataproc (Spark) 在 BigQuery 中加载 CSV 文件
【发布时间】:2018-02-26 10:45:34
【问题描述】:

我正在尝试从 GCS 中的 CSV 文件中读取数据并将其保存在 BigQuery 表中。

这是我的 csv 文件:

1,Marc,B12,2017-03-24
2,Marc,B12,2018-01-31
3,Marc,B21,2017-03-17
4,Jeam,B12,2017-12-30
5,Jeam,B12,2017-09-02
6,Jeam,B11,2018-06-30
7,Jeam,B21,2018-03-02
8,Olivier,B20,2017-12-30

这是我的代码:

val spark = SparkSession
    .builder()
    .appName("Hyp-session-bq")
    .config("spark.master","local")
    .getOrCreate()
  val sc : SparkContext = spark.sparkContext


  val conf=sc.hadoopConfiguration

  //Input Parameters
  val projectId = conf.get("fs.gs.project.id")
  val bucket = conf.get("fs.gs.system.bucket")
  val inputTable = s"$projectId:rpc.testBig"

  //Input Configuration
  conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId)
  conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,bucket)
  BigQueryConfiguration.configureBigQueryInput(conf,inputTable)

  //Output Parameters
  val outPutTable = s"$projectId:rpc.outTestBig"

  // Temp output bucket that is deleted upon completion of job
  val outPutGcsPath = ("gs://"+bucket+"/hadoop/tmp/outTestBig")

  BigQueryOutputConfiguration.configure(conf,
    outPutTable,
    null,
    outPutGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

  conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName)

  // Truncate the table before writing output to allow multiple runs.
  conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")

  val text_file = sc.textFile("gs://test_files/csvfiles/test.csv")
  val lignes = text_file.flatMap(x=>x.split(" "))
  case class schemaFile(id: Int, name: String, symbole: String, date: String)

  def parseStringWithCaseClass(str: String): schemaFile = schemaFile(
      val id = str.split(",")(0).toInt,
      val name = str.split(",")(1),
      val symbole = str.split(",")(2),
      val date = str.split(",")(3)
    )

    val result1 = lignes.map(x=>parseStringWithCaseClass(x))
    val x =result1.map(elem =>(null,new Gson().toJsonTree(elem)))
    val y = x.saveAsNewAPIHadoopDataset(conf)  

当我运行代码时,我得到这个错误:

ERROR org.apache.spark.internal.io.SparkHadoopMapReduceWriter: Aborting job job_20180226083501_0008.
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Load configuration must specify at least one source URI",
    "reason" : "invalid"
  } ],
  "message" : "Load configuration must specify at least one source URI"
}
        at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.insertJobOrFetchDuplicate(BigQueryHelper.java:306)
        at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:160)
        at com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
        at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
        at jeam.BigQueryIO$.main(BigQueryIO.scala:115)
        at jeam.BigQueryIO.main(BigQueryIO.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  

我认为问题是关于 case 类parseStringWithCaseClass 但我不知道如何解决这个问题。 我在配置中没有问题,因为我在尝试使用 wordcount 示例时得到了完美的结果:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

【问题讨论】:

  • 为什么需要 Spark 将 CSV 文件加载到 BigQuery 中?
  • 因为我在 GCS 中有一个数据湖,我想将 CSV 文件加载到 BigQuery 以供 SQL 用户和可视化使用
  • 但您不需要 Spark 来执行此操作。您可以直接从 GCS 加载到 BigQuery。
  • 如何做到这一点,并自动和规范,例如我拆分我的 csv 并保存一个特定的列?

标签: apache-spark google-bigquery google-cloud-storage google-cloud-dataproc


【解决方案1】:

尝试使用 Tuple4:

  def parseStringWithTuple(str: String): Tuple4[Int, String, String, String] = {
      val id = str.split(",")(0).toInt
      val name = str.split(",")(1)
      val symbole = str.split(",")(2)
      val date = str.split(",")(3)
      (id, name, symbole, date)
    }
val result1 = lignes.map(x=>parseStringWithTuple(x))

但是我测试了你的代码,它运行良好。

【讨论】:

    【解决方案2】:

    我一直在使用我自己的 BigQuery 表和 CSV 文件运行您的代码进行一些测试,它对我来说很有效,无需任何额外的修改。

    我看到当您按照@jean-marc 的建议将CaseClass 更改为Tuple4 时,您的代码开始工作,所以这是一种奇怪的行为,更重要的是考虑到对他和我来说,您的代码是实际工作,无需进一步修改。错误 Load configuration must specify at least one source URI 通常出现在 BigQuery 中的加载作业配置不正确且未接收到正确的 Cloud Storage 对象 URL 时。但是,如果仅更改为 Tuple4 时相同的确切代码有效,并且您使用的 CSV 文件相同且未更改(即 URL 有效),则可能是暂时性问题,可能与 Cloud Storage 有关或 BigQuery,而不是 Dataproc 作业本身。

    最后,鉴于此问题是您所特有的(它已为至少两个使用相同代码的用户工作),一旦您检查了与 Cloud Storage 对象相关的问题(权限、错误位置等),您可能有兴趣在 Public Issue Tracker 中创建一个错误。

    【讨论】:

      猜你喜欢
      • 2020-01-31
      • 2022-07-11
      • 1970-01-01
      • 2015-06-24
      • 2017-06-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多