【问题标题】:Flatten nested array in Spark DataFrame在 Spark DataFrame 中展平嵌套数组
【发布时间】:2021-03-04 14:23:20
【问题描述】:

我正在阅读来自以下地址的一些 JSON:

{"a": [{"b": {"c": 1, "d": 2}}]}

也就是说,数组项是不必要的嵌套。现在,由于这发生在数组内部,How to flatten a struct in a Spark dataframe? 中给出的答案并不直接适用。

这是数据框在解析时的样子:

root
|-- a: array
|    |-- element: struct
|    |    |-- b: struct
|    |    |    |-- c: integer
|    |    |    |-- d: integer

我希望将数据框转换为:

root
|-- a: array
|    |-- element: struct
|    |    |-- b_c: integer
|    |    |-- b_d: integer

如何对数组中的列进行别名处理以有效地取消嵌套?

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    你可以使用transform:

    df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
    

    【讨论】:

      【解决方案2】:

      使用accepted answer 中介绍的方法,我编写了一个函数来递归地取消嵌套数据帧(也递归到嵌套数组中):

      from pyspark.sql.types import ArrayType, StructType
      
      def flatten(df, sentinel="x"):
          def _gen_flatten_expr(schema, indent, parents, last, transform=False):
              def handle(field, last):
                  path = parents + (field.name,)
                  alias = (
                      " as "
                      + "_".join(path[1:] if transform else path)
                      + ("," if not last else "")
                  )
                  if isinstance(field.dataType, StructType):
                      yield from _gen_flatten_expr(
                          field.dataType, indent, path, last, transform
                      )
                  elif (
                      isinstance(field.dataType, ArrayType) and
                      isinstance(field.dataType.elementType, StructType)
                  ):
                      yield indent, "transform("
                      yield indent + 1, ".".join(path) + ","
                      yield indent + 1, sentinel + " -> struct("
                      yield from _gen_flatten_expr(
                          field.dataType.elementType, 
                          indent + 2, 
                          (sentinel,), 
                          True, 
                          True
                      )
                      yield indent + 1, ")"
                      yield indent, ")" + alias
                  else:
                      yield (indent, ".".join(path) + alias)
      
              try:
                  *fields, last_field = schema.fields
              except ValueError:
                  pass
              else:
                  for field in fields:
                      yield from handle(field, False)
                  yield from handle(last_field, last)
      
          lines = []
          for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
              spaces = " " * 4 * indent
              lines.append(spaces + line)
      
          expr = "struct(" + "\n".join(lines) + ") as " + sentinel
          return df.selectExpr(expr).select(sentinel + ".*")
      

      【讨论】:

      • 这对我没有任何作用
      • @Garglesoap 你能把你的问题简化成一个可以在这里分享的简短例子吗?
      • 对不起,我很沮丧。我发现这样的工作: newdf = result.withColumn("sentiment", explode("sentiment")).select("",col("sentiment.")).drop("文档","sentence","tokens","word_embeddings","sentence_embeddings","sentiment")
      【解决方案3】:

      简化方法:

      from pyspark.sql.functions import col
      
      def flatten_df(nested_df):
          stack = [((), nested_df)]
          columns = []
      
          while len(stack) > 0:
              parents, df = stack.pop()
      
              flat_cols = [
                  col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
                  for c in df.dtypes
                  if c[1][:6] != "struct"
              ]
      
              nested_cols = [
                  c[0]
                  for c in df.dtypes
                  if c[1][:6] == "struct"
              ]
      
              columns.extend(flat_cols)
      
              for nested_col in nested_cols:
                  projected_df = df.select(nested_col + ".*")
                  stack.append((parents + (nested_col,), projected_df))
      
          return nested_df.select(columns)
      

      参考:https://docs.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-11-08
        • 2021-05-22
        • 2023-03-03
        • 2023-03-03
        • 1970-01-01
        • 2015-10-29
        • 2019-10-31
        相关资源
        最近更新 更多