【问题标题】:Having NotSerializableException when managing a JavaRDD<Row> - Cassandra管理 JavaRDD<Row> 时出现 NotSerializableException - Cassandra
【发布时间】:2017-11-22 23:56:39
【问题描述】:

我正在从 Cassandra 数据库中提取数据。我正确地得到了ResultSet,然后我把它变成了JavaRDD&lt;Row&gt;,这也很好,但是当我在JavaRDD&lt;Row&gt; 上执行mapToPair 以获得JavaPairRDD&lt;String, Integer&gt; 时,我得到了这个错误:

[2017-06-20 13:59:53,038] 遇到错误异常 (org.apache.spark.util.Utils:91) java.io.NotSerializableException: com.datastax.driver.core.ArrayBackedRow

等等。 另外,在调试时,我从调试控制台收到一条找不到源的消息。 这是我的代码:

ResultSet rs = cm.getTweetsWithTime(topic.getText(), oraInizio.getText(), oraFine.getText());
JavaRDD<Row> queryResults = jsc.parallelize(rs.all());
JavaPairRDD<String, Integer> popTweets = queryResults.mapToPair(x->new Tuple2<String, Integer>(x.getString("text"), (x.getInt("likecount")+x.getInt("retweetcount")))); 

完整的堆栈跟踪:

> [2017-06-20 15:35:48,488] ERROR Exception encountered (org.apache.spark.util.Utils:91)
java.io.NotSerializableException: com.datastax.driver.core.ArrayBackedRow
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeArray(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteObject(Unknown Source)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:246)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:452)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:432)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:432)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:264)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:259)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:333)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:331)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:331)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:328)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:328)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

我怎样才能解决这个错误? 感谢您的帮助。

【问题讨论】:

  • 你能提供完整的堆栈跟踪吗?
  • 我做了,检查编辑。谢谢你的时间。 @YuvalItzchakov
  • rs.all() 返回什么?
  • 它返回一个空列表(奇怪的事实。如果我在其类中运行查询,它会返回一个完整大小的列表)
  • 谁在生产ArrayBackedRow 类型实例?

标签: java apache-spark cassandra datastax


【解决方案1】:

你能不能试试Resultset的map函数,把它映射到一个case类。它适用于我的 scala 代码。

 case class DataModelBean(i:Integer)
 val mapped = rs.map(f => DataModelBean(f.getInt("id")))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-07-24
    • 2012-01-29
    • 2014-11-05
    • 1970-01-01
    • 2013-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多