【问题标题】:Error while converting RDD to Dataframe [PySpark]将 RDD 转换为数据帧时出错 [PySpark]
【发布时间】:2021-02-10 01:19:24
【问题描述】:

我正在尝试使用以下代码将 RDD 转换回 Spark DataFrame

schema = StructType(
        [StructField("msn", StringType(), True),
         StructField("Input_Tensor", ArrayType(DoubleType()), True)]
         )
DF = spark.createDataFrame(rdd, schema=schema)

数据集只有两列:

  • msn 只包含一个字符串。
  • Input_Tensor 一个二维浮点数组。

但我一直有这个错误,我不确定它来自哪里:

File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/myproject/datasets/train.py", line 51, in EMA_detector
    DF = spark.createDataFrame(rdd, schema=schema)
  File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/pyspark/sql/session.py", line 790, in createDataFrame
    jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
  File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/pyspark/rdd.py", line 2364, in _to_java_object_rdd
    return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
  File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/pyspark/rdd.py", line 2599, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/pyspark/rdd.py", line 2500, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/pyspark/rdd.py", line 2486, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/tmp/conda-d3f87356-6008-4349-9075-f488e0870d02/real/envs/conda-env/lib/python3.6/site-packages/pyspark/serializers.py", line 694, in dumps
    raise pickle.PicklingError(msg)

_pickle.PicklingError: Could not serialize object: AttributeError: 'NoneType' object has no attribute 'items'

编辑:

我的 RDD 来自这个:

rdd = test_data.mapPartitions(lambda part: vectorizer.transform(part))

数据集 test_data 本身就是一个 RDD,但不知何故,在 mapPartitions 之后它是一个 pipelinedRDD,这似乎就是它失败的原因。

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    假设您的rdd 由以下数据定义:

    data = [("row1", [[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]]), ("row2", [[7.7, 8.8, 9.9], [10.10, 11.11, 12.12]])]
    

    然后你可以使用toDF() 方法来推断你的数据类型。在这种情况下stringarray<array<double>>

    >>> sc.parallelize(data).toDF(["msn", "Input_Tensor"])
    DataFrame[msn: string, Input_Tensor: array<array<double>>]
    

    最终结果:

    >>> sc.parallelize(data).toDF(["msn", "Input_Tensor"]).show(truncate=False)
    +----+---------------------------------------+
    |msn |Input_Tensor                           |
    +----+---------------------------------------+
    |row1|[[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]]     |
    |row2|[[7.7, 8.8, 9.9], [10.1, 11.11, 12.12]]|
    +----+---------------------------------------+
    

    但是,如果张量在模式中定义为双精度数组,您仍然可以使用 createDataFrame 方法。

    from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType
    data = [("row1", [[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]]), ("row2", [[7.7, 8.8, 9.9], [10.10, 11.11, 12.12]])]
    rdd = sc.parallelize(data)
    schema = StructType([
                 StructField("msn", StringType(), True),
                 StructField("Input_Tensor", ArrayType(ArrayType(DoubleType())), True)])
    spark.createDataFrame(rdd, schema=schema).show(truncate=False)
    

    【讨论】:

    • 谢谢,我使用的是toDF() ,但我遇到了同样的错误,虽然它来自模式推断。但它可能来自我没有将我的二维数组定义为ArrayType(ArrayType(DoubleType()))。代码正在运行我会告诉你是否可行
    • 我已经更新了我的帖子,我认为它来自上面的步骤,我的 RDD 是 mapPartitions 的结果
    猜你喜欢
    • 1970-01-01
    • 2017-01-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-16
    • 2018-09-10
    相关资源
    最近更新 更多