【问题标题】:Writing a SimpleFeature via Spark RDDs to Cassandra通过 Spark RDD 将 SimpleFeature 写入 Cassandra
【发布时间】:2019-03-06 22:09:08
【问题描述】:

我想知道是否可以在 Spark 上下文中向 Cassandra 写入 SimpleFeature?我正在尝试将我的数据的 SimpleFeatures 映射到 Spark RDD,但我遇到了一些问题。以下被调用的 createFeature() 函数在独立单元测试中工作正常,我有另一个单元测试调用它,并成功通过 GeoMesa api 使用它生成的 SimpleFeature 写入 Cassandra:

import org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator

. . .

private val sparkConf = new SparkConf(true).set("spark.cassandra.connection.host","localhost").set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.kryo.registrator",classOf[GeoMesaSparkKryoRegistrator].getName).setAppName(appName).setMaster(master)

. . .                                            

val rowsRDD = processedRDD.map(r => {

...

println("** NAME VALUE MAP **")

for ((k,v) <- featureNamesValues) printf("key: %s, value: %s\n", k, v)

val feature = MyGeoMesaManager.createFeature(featureTypeConfig.asJava,featureNamesValues.asJava)
feature
})

rowsRDD.print()

但是,我现在在 Spark 上下文中的 RDD 的 map() 函数中调用函数这一事实导致 SimpleFeatureImpl 上的序列化错误,原因是 Spark 分区:

18/02/12 08:00:46 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 
9)
java.io.NotSerializableException: org.geotools.feature.simple.SimpleFeatureImpl
Serialization stack:
- object not serializable (class: org.geotools.feature.simple.SimpleFeatureImpl, value: SimpleFeatureImpl:myfeature=[SimpleFeatureImpl.Attribute: . . ., SimpleFeatureImpl.Attribute: . . .])
- element of array (index: 0)
- array (class [Lorg.opengis.feature.simple.SimpleFeature;, size 4)

好吧,然后我添加了 geomesa spark 核心页面上提到的 kyro 依赖项,以减轻这种情况,但是现在我在执行 map 函数时在 GeoMesaSparkKryoRegistrator 类上收到 NoClassDefFoundError,但是正如您所看到的 geomesa -spark-core 依赖存在于类路径中,我可以导入该类:

18/02/12 08:08:37 ERROR Executor: Exception in task 0.0 in stage 26.0 (TID 
11)
java.lang.NoClassDefFoundError: Could not initialize class org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator$
at org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator$$anon$1.write(GeoMesaSparkKryoRegistrator.scala:36)
at org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator$$anon$1.write(GeoMesaSparkKryoRegistrator.scala:32)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:315)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

最后,我尝试将 com.esotericsoftware.kryo 依赖项添加到类路径中,但我得到了同样的错误。

是否有可能用 GeoMesa、Spark 和 Cassandra 做我想做的事情?感觉就像我在 1 码线上,但我不能完全进入。

【问题讨论】:

    标签: apache-spark geomesa


    【解决方案1】:

    设置类路径的最简单方法是使用 maven 和 maven shade 插件。添加对 geomesa-cassandra-datastore 和 geomesa-spark-geotools 模块的依赖:

    <dependency>
      <groupId>org.locationtech.geomesa</groupId>
      <artifactId>geomesa-cassandra-datastore_2.11</artifactId>
    </dependency>
    <dependency>
      <groupId>org.locationtech.geomesa</groupId>
      <artifactId>geomesa-spark-geotools_2.11</artifactId>
    </dependency>
    

    然后添加一个 maven 阴影插件,类似于用于 Accumulo 的 here。使用阴影 jar 提交您的 spark 作业,类路径应该包含所需的一切。

    【讨论】:

    • 好的,是的,我确实在类路径上有这两个依赖项,尽管我没有使用阴影 jar。包含正确的依赖项是否足以序列化对象并让它们通过 GeoMesa api 正确写入 Cassandra,或者我是否还需要 GeoMesaSpark 对象来获取 spatialRDDProvider 等,正如 geomesa-spark-core 页面提到的那样?我查看了 github 上的代码,我认为 Cassandra 没有 SpatialRDDProvider。所以这是我最关心的问题 - 是否可能 做我想做的事情(即 - 使用 Cassandra 和 Spark,而不是 Accumulo 和 Spark)
    • 您似乎打算使用 spark 内部的常规 Cassandra 数据存储,这应该可以正常工作。包含geomesa spark模块主要是为了获取序列化位。或者,虽然没有针对 Cassandra 优化的 SpatialRDDProvider,但您可以改用通用的 GeoToolsSpatialRDDProvider。我仍然建议创建一个带阴影的 jar 来设置类路径。
    • 好的,感谢您的回复和指导。是的,这就是我目前正在做的事情,它似乎在我不包含 Spark 上下文的单元测试中运行良好。为了达到这一点,我做了很多工作,我只是希望序列化/kyro 依赖问题是让我此时无法写信给 Cassandra 的全部原因。好的,我对带阴影的罐子了解不多,但会研究它。再次感谢埃米利奥。如果成功我会回帖
    • 环境中似乎没有设置一些东西,因为对 GeoMesaSparkKryoRegistratorEndpoint.init() 的调用是第 43 行失败的原因,即“Option(SparkEnv.get).foreach {”当 GeoMesaSparkKryoRegistrator 试图加载时。我只是不知道为什么 SparkEnv.get 失败了。是否需要设置其他一些 spark 或 geomesa kyro 系统属性?我在 GeoMesaSparkKryoRegistratorEndpoint 中看到了对“spark.geomesa.kryo.rpc.enable”的引用。谢谢
    • 我想做的是在单元测试中运行它(如 Spark 集成测试),我没有提交作业来测试它。这就是为什么我希望解决单元测试环境中的依赖问题,这样我就不必多次提交 Spark 作业。我有许多其他类似的测试连接到我的本地 Spark,并且我已经通过其中一个测试通过 GeoMesa 写入 Cassandra,因此我的设置对于所有其他测试都是正确的。似乎只是 Spark GeoMesa 依赖项导致我试图创建一个 sf RDD 的这个单元测试出现问题
    猜你喜欢
    • 2019-11-12
    • 2021-02-03
    • 2020-06-25
    • 2016-01-30
    • 2020-12-14
    • 2015-01-30
    • 2020-05-23
    • 2018-01-12
    • 2017-06-26
    相关资源
    最近更新 更多