【问题标题】:In Spark, How to convert multiple dataframes into an avro?在 Spark 中,如何将多个数据帧转换为 avro?
【发布时间】:2023-03-22 10:16:01
【问题描述】:

我有一个 Spark 作业,将一些数据处理成几个单独的数据帧。我将这些数据帧存储在一个列表中,即数据帧 []。最终,我想将这些数据帧组合成分层格式,并将输出写入 avro。 avro 架构是这样的:

{
    "name": "mydata",
    "type": "record",
    "fields": [
        {"name": "data", "type": {
            "type": "array", "items": {
                "name": "actualData", "type": "record", "fields": [
                    {"name": "metadata1", "type": "int"},
                    {"name": "metadata2", "type": "string"},
                    {"name": "dataframe", "type": {
                        "type": "array", "items": {
                            "name": "dataframeRecord", "type": "record", "fields": [
                                {"name": "field1", "type": "int"},
                                {"name": "field2", "type": "int"},
                                {"name": "field3", "type": ["string", "null"]}]
                            }
                        }
                    }]
                }
            }
        }
    ]
}

可以推断,每个数据帧都有三个字段,field1、field2 和 field3,我想将它们作为数组写入 avro 文件。还有一些与每个数据帧相关的元数据。

我目前的做法是,一旦处理完这些数据,将数据帧写入 S3,然后使用单独的程序从 S3 中提取这些数据,使用 avro 库编写 avro 文件,然后将其上传到 S3再次。

但是,随着数据量的增长,这变得非常缓慢。我已经查看了 databricks 库以直接编写 avro 文件,但我不知道如何在内存中将数据帧组合在一起,或者 databricks 库如何确定我正在使用的架构。

在 Spark 中有没有一种惯用的方法来做到这一点?

附:我在 Python 中使用 EMR 和 Spark 2.0.0。

【问题讨论】:

    标签: apache-spark pyspark avro emr spark-avro


    【解决方案1】:

    如果架构相同,并且您只想将所有记录放入同一个 DataFrame 中,则可以使用 DataFrame unionAll 方法。

    http://spark.apache.org/docs/1.6.3/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionAll

    此函数将获取一个数据帧并将其附加到另一个数据帧。问题是它假设列在两者之间的顺序相同,因此您可能需要做一些工作以使它们对齐并为缺少的任何列创建空列。这是我用来安全合并多个数据帧的python函数

    def union_multiple_dataframes(iterable_list_df):
        input_dfs = list(iterable_list_df)
    
        # First figure out all the field names
        field_types = {}
        for df in input_dfs:
            for field in df.schema.fields:
                # Check for type mismatch
                if field in field_types:
                    if field.dataType != field_types[field.name]:
                        raise ValueError("Mismatched data types when unioning dataframes for field: {}".format(field))
                else:
                    field_types[field.name] = field.dataType
    
        # First add in empty fields so all df's have the same schema
        fields = set(field_types.keys())
        for i, df in enumerate(input_dfs):
            missing = fields - set(df.schema.names)
            for field in missing:
                df = df.withColumn(field, F.lit(None))
    
            input_dfs[i] = df
    
        # Finally put all the df's columns in the same order, and do the actual union
        sorted_dfs = [df.select(*sorted(fields)) for df in iterable_list_df]
        return reduce(lambda x, y: x.unionAll(y), sorted_dfs)
    

    示例用法如下:

    input_dfs = [do_something(..) for x in y]
    combined_df = union_multiple_dataframes(input_dfs)
    combined_df.write.format("com.databricks.spark.avro").save("s3://my-bucket/path")
    

    【讨论】:

    • 感谢您花时间回答。我的数据框确实遵循相同的格式,但在组合它们之前,我需要用一些元数据包装每个数据框。在我的问题中,我列出了我正在使用的架构。在该架构中,我的数据框中包含的数据仅在“dataframeRecord”部分的范围内。如何在合并之前将元数据字段添加到我的数据框中?
    • 输入 DF 的每一行的元数据是否相同?你可以在联合之前将元数据附加到每个 DF 上吗?
    • 它们不一样,但是通过一些代码我确定可以在联合之前处理数据。但是,如何将元数据附加到数据框?
    • 您能否详细说明您是如何生成元数据的?如果它基于您的行数据,您可以在输入 DF 上使用 withColumn。否则,数据框连接可能会满足您的需求。如果不更好地了解您要做什么,就很难知道。
    • 我想出了一个解决方案,虽然有点老套。我会在下面发布。
    【解决方案2】:

    我已经找到了针对 PySpark 的解决方案:

    对于每个数据框,我使用 .collect() 来获取行列表。对于每个 Row 对象,我调用 asDict() 来获取字典。从那里,我能够用一个简单的循环构造一个字典列表。一旦我有了这个字典列表,数据就会退出 Spark 并进入纯 Python 领域,并且“更容易”处理(但效率较低)。

    另外,如果我选择了 Scala 而不是 Python,我可能能够将数据框转换为数据集,这似乎提供了一些方法来执行我需要的操作,但这完全是另一回事。

    【讨论】:

      猜你喜欢
      • 2018-09-01
      • 1970-01-01
      • 1970-01-01
      • 2016-01-03
      • 1970-01-01
      • 2019-04-29
      • 2022-01-09
      • 2016-04-21
      相关资源
      最近更新 更多