【问题标题】:How can i convert RDD[Map[String,Any]] to a dataframe?如何将 RDD[Map[String,Any]] 转换为数据框?
【发布时间】:2020-03-09 21:38:37
【问题描述】:

我有一个 RDD[Map[String,Any]],我正在尝试将其转换为 Dataframe。我没有可以指定数据框的架构。

我试图做一个 rdd.toDF 但这没有帮助。它抛出如下错误。

Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172)

示例输入

val data: RDD[Map[String, Any]] = appContext.sc.parallelize(List(
      Map("A" -> "B"),              //Value could be String
      Map("C" -> 123),              //Value could be Numerical(Long, Double, Int etc)
      Map("D" -> Map("E" -> "F")),  // Could be another Map
      Map("G" -> List("H" , "I")),  // List of values
      Map("J" -> List(              // List Of Maps
        Map("K" -> "L"),
        Map("M" -> "N")
      ))
    ))

通过执行以下操作(JsonUtils 是 Jackson 的包装器),我能够将其变成数据框,但给我带来了性能问题。

def convert(data: RDD[Map[String, Any]]): DataFrame = {
        sparkSession.read.json(data.map(each => JsonUtils.toJson(each)))
      }

我们可以使用其他任何方法来实现这一目标,从而提供更好的性能吗?任何建议都非常感谢!

更新:我没有使用 DataFrame 进行任何处理。我只想以 3 种不同的格式编写输出,并且转换为 DataFrame 是我能找到的获得一致输出的最佳方法。在不实际转换为 Dataframe 的情况下实现此目的的任何其他建议也将非常有帮助。

df.write.avro("/path/to/avroFile")
df.write.parquet("/path/to/parquetFile")
df.write.json("/path/to/jsonFile")

【问题讨论】:

  • 你能用一些示例输入和输出数据更新你的问题吗?
  • 添加样本输入
  • 转换和使用单列类型(不是Any)将提供最佳性能。我想这不是一个选择吗?也许这会让你知道你可以测试什么:stackoverflow.com/questions/41504976/…
  • 请问您为什么要使用数据框?
  • 我只是将其转换为 JSON,以便我可以将其写成 3 种不同的输出格式,即 AVRO、PARQUET 和 JSON。我并没有真正使用 Dataframe 进行任何处理。 1. df.write.avro("/path/toFile") 2. df.write.parquet("/path/toFile") 3. df.write.json("/path/toFile") 更新问题以反映用例也是

标签: dataframe apache-spark apache-spark-sql rdd databricks


【解决方案1】:

您将无法将包含 Any 的 RDD 转换为 Dataframe。

但是,也许您可​​以将您的初始 RDD 分开(也许这对您有用),例如:

一个只有 Map(String, String) 的 RDD,另一个带有 Map(String, Int) 等的 RDD。

获得 RDD 后,您可以使用 toDF 方法将它们转换为 DF,最后加入它们,因此最终您将拥有一个包含以下内容的 Dataframe:

+-----+-------------+----------+-----------------+
| Key | StringValue | IntValue |    MapValue     |
+-----+-------------+----------+-----------------+
| A   | SomeString  |      123 | Map("A" -> "B") |
| B   | SomeString  |      456 | Map("B" -> "C") |
+-----+-------------+----------+-----------------+

【讨论】:

    猜你喜欢
    • 2017-08-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-11
    • 2020-06-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多