【发布时间】:2020-03-09 21:38:37
【问题描述】:
我有一个 RDD[Map[String,Any]],我正在尝试将其转换为 Dataframe。我没有可以指定数据框的架构。
我试图做一个 rdd.toDF 但这没有帮助。它抛出如下错误。
Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172)
示例输入
val data: RDD[Map[String, Any]] = appContext.sc.parallelize(List(
Map("A" -> "B"), //Value could be String
Map("C" -> 123), //Value could be Numerical(Long, Double, Int etc)
Map("D" -> Map("E" -> "F")), // Could be another Map
Map("G" -> List("H" , "I")), // List of values
Map("J" -> List( // List Of Maps
Map("K" -> "L"),
Map("M" -> "N")
))
))
通过执行以下操作(JsonUtils 是 Jackson 的包装器),我能够将其变成数据框,但给我带来了性能问题。
def convert(data: RDD[Map[String, Any]]): DataFrame = {
sparkSession.read.json(data.map(each => JsonUtils.toJson(each)))
}
我们可以使用其他任何方法来实现这一目标,从而提供更好的性能吗?任何建议都非常感谢!
更新:我没有使用 DataFrame 进行任何处理。我只想以 3 种不同的格式编写输出,并且转换为 DataFrame 是我能找到的获得一致输出的最佳方法。在不实际转换为 Dataframe 的情况下实现此目的的任何其他建议也将非常有帮助。
df.write.avro("/path/to/avroFile")
df.write.parquet("/path/to/parquetFile")
df.write.json("/path/to/jsonFile")
【问题讨论】:
-
你能用一些示例输入和输出数据更新你的问题吗?
-
添加样本输入
-
转换和使用单列类型(不是
Any)将提供最佳性能。我想这不是一个选择吗?也许这会让你知道你可以测试什么:stackoverflow.com/questions/41504976/… -
请问您为什么要使用数据框?
-
我只是将其转换为 JSON,以便我可以将其写成 3 种不同的输出格式,即 AVRO、PARQUET 和 JSON。我并没有真正使用 Dataframe 进行任何处理。 1. df.write.avro("/path/toFile") 2. df.write.parquet("/path/toFile") 3. df.write.json("/path/toFile") 更新问题以反映用例也是
标签: dataframe apache-spark apache-spark-sql rdd databricks