【问题标题】:unable to apply gpfdist protocol while working with spark使用 spark 时无法应用 gpfdist 协议
【发布时间】:2019-03-05 17:59:58
【问题描述】:

我正在尝试使用 spark 将数据从 greenplum 读取到 HDFS。为此,我使用了 jar 文件:greenplum-spark_2.11-1.6.0.jar

应用 spark.read 如下:

val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", "jdbc:postgresql://1.2.3.166:5432/finance?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory").option("server.port","8020").option("dbtable", "tablename").option("dbschema","schema").option("user", "123415").option("password", "etl_123").option("partitionColumn","je_id").option("partitions",3).load().where("period_year=2017 and period_num=12 and source_system_name='SSS'").select(splitSeq map col:_*).withColumn("flagCol", lit(0))
yearDF.write.format("csv").save("hdfs://dev/apps/hive/warehouse/header_test_data/")

当我运行上面的代码时,我得到了异常:

Exception in thread "qtp1438055710-505" java.lang.OutOfMemoryError: GC overhead limit exceeded
19/03/05 12:29:08 WARN QueuedThreadPool:
java.lang.OutOfMemoryError: GC overhead limit exceeded
19/03/05 12:29:08 WARN QueuedThreadPool: Unexpected thread death: org.eclipse.jetty.util.thread.QueuedThreadPool$3@16273740 in qtp1438055710{STARTED,8<=103<=200,i=19,q=0}
19/03/05 12:36:03 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 8)
org.postgresql.util.PSQLException: ERROR: error when writing data to gpfdist http://1.2.3.8:8020/spark_6ca7d983d07129f2_db5510e67a8a6f78_driver_370, quit after 2 tries (url_curl.c:584)  (seg7 ip-1-3-3-196.ec2.internal:40003 pid=4062) (cdbdisp.c:1322)
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2310)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2023)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:217)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
    at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:294)
    at com.zaxxer.hikari.pool.ProxyStatement.executeUpdate(ProxyStatement.java:120)
    at com.zaxxer.hikari.pool.HikariProxyStatement.executeUpdate(HikariProxyStatement.java)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$$anonfun$2.apply(Jdbc.scala:81)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$$anonfun$2.apply(Jdbc.scala:79)
    at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch.apply(Exception.scala:103)
    at scala.util.control.Exception$Catch.either(Exception.scala:125)
    at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
    at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
    at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
    at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
    at scala.util.Try$.apply(Try.scala:192)
    at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:83)
    at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:105)
    at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:104)
    at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49)

我按照官方documentation中提到的步骤应用了这些步骤

之前我使用了 jar:greenplum.jar,它工作正常,但速度较慢,因为它通过 GP Master 提取数据。 jar:greenplum-spark_2.11-1.6.0.jar 是一个连接器 jar,它使用gpfdist 协议将数据拉取到 HDFS。

异常消息中的 IP 地址也发生了变化。你可以看到IP1.2.3.166:5432变成1.2.3.8:8020seg7 ip-1-3-3-196.ec2.internal:40003 pid=4062

使用相同数量的执行器和执行器内存,我可以使用greenplum.jar 检索数据。但是保持一切不变,只是将 jar 更改为 greenplum-spark_2.11-1.6.0.jar 只是为了面对这个异常。 我一直在尝试解决这个问题,但我根本不理解这种现象。 谁能告诉我如何解决这个问题?

【问题讨论】:

    标签: apache-spark greenplum


    【解决方案1】:

    Greenplum-Spark 连接器旨在并行处理 Greenplum 段和 Spark 工作器之间的数据传输。为了充分利用并行数据传输,您必须提供足够的内存和 spark workers ,以加快数据传输。否则,您可以使用使用单个 JDBC 连接器的 greenplum.jar 通过单个 Greenplum master 将数据从 HDFS 加载到 Greenplum 数据库中。当您将数据加载到单个 Greenplum 主服务器时,速度会明显变慢。

    一些注意事项: - 取决于 Greenplum 段的数量,您是否有足够的 Spark 工作人员/执行程序来在 Spark 和 Greenplum 集群之间接收或发送数据?
    - 取决于分配给 Spark 工作者/执行者的内存。参考“Tuning Spark”文档

    从带有此消息“java.lang.OutOfMemoryError:GC 开销限制超出”的错误日志中,我可以假设您的 spark worker/executors 没有足够的内存。您仍然需要调整您的 Spark 工作线程,以便它可以并行地从 HDFS 加载数据。

    【讨论】:

      【解决方案2】:

      你能增加分区的数量吗?根据表的大小,您可能需要增加分区数。您可以尝试将分区数量增加到 30 个,看看是否仍然遇到内存不足的问题?

      val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", "jdbc:postgresql://1.2.3.166:5432/finance?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory").option("server.port","8020").option("dbtable", "tablename").option("dbschema","schema").option("user", "123415").option("password", "etl_123").option("partitionColumn","je_id").option("partitions",30).load().where("period_year=2017 and period_num=12 and source_system_name='SSS'").select(splitSeq map col:_*).withColumn("flagCol", lit(0))
      yearDF.write.format("csv").save("hdfs://dev/apps/hive/warehouse/header_test_data/")
      

      【讨论】:

      • 只有200万行,表中的列数是37。即使我给一个4gb内存的分区,spark应该可以处理它。但我不认为我应用了正确的协议。
      • 我认为您使用协议的方式没有任何异常。但是,GC 开销限制错误确实让我认为 Spark 的数据量存在问题。也许尝试 10 个分区 .option("partitions",10) 看看你是否能够克服 GC 问题。
      猜你喜欢
      • 2023-03-07
      • 1970-01-01
      • 2019-11-07
      • 2017-05-23
      • 2021-05-01
      • 2014-10-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多