【问题标题】:Transform several Dataframe rows into a single row将多个 Dataframe 行转换为一行
【发布时间】:2019-07-21 12:22:38
【问题描述】:

以下是 Dataframe sn-p 示例:

+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_lid               |trace                           |message                                                                                                                                                                                                                                                                                                                                                                            |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1103960793391132675|47c10fda9b40407c998c154dc71a9e8c|[app.py:208] Prediction label: {"id": 617, "name": "CENSORED"}, score=0.3874854505062103                                                                                                                                                                                                                                                                                           |
|1103960793391132676|47c10fda9b40407c998c154dc71a9e8c|[app.py:224] Similarity values: [0.6530804801919593, 0.6359653379418201]                                                                                                                                                                                                                                                                                                           |
|1103960793391132677|47c10fda9b40407c998c154dc71a9e8c|[app.py:317] Predict=s3://CENSORED/scan_4745/scan4745_t1_r0_c9_2019-07-15-10-32-43.jpg trait_id=112 result=InferenceResult(predictions=[Prediction(label_id='230', label_name='H3', probability=0.0), Prediction(label_id='231', label_name='Other', probability=1.0)], selected=Prediction(label_id='231', label_name='Other', probability=1.0)). Took 1.3637824058532715 seconds |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

我有数百万个类似日志的结构,它们都可以按会话独有的跟踪进行分组。

我希望将这些行集转换为单行,本质上是对它们进行映射,对于此示例,我将从第一个名称中提取 "id": 617,从第二行中提取值 0.6530804801919593, 0.6359653379418201,从第三行中提取将Prediction(label_id='231', label_name='Other', probability=1.0) 值排成一行。

然后我将组成一个包含列的新表:

| trace | id | similarity | selected |

与价值观:

| 47c10fda9b40407c998c154dc71a9e8c | 617 | 0.6530804801919593, 0.6359653379418201 | 231 |

我应该如何在 pyspark 中的多行上实现这个组映射转换?

【问题讨论】:

    标签: dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    为方便起见,我用 Scala 编写了以下示例,但它应该很容易转换为 Pyspark。

    1) 通过“消息”字段上的regexp_extract 在数据框中创建新列。如果正则表达式匹配,这将产生所需的值,否则将产生空字符串:

    scala> val dss = ds.select(
         | 'trace, 
         | regexp_extract('message, "\"id\": (\\d+),", 1) as "id", 
         | regexp_extract('message, "Similarity values: \\[(\\-?[0-9\\.]+, \\-?[0-9\\.]+)\\]", 1) as "similarity", 
         | regexp_extract('message, "selected=Prediction\\(label_id='(\\d+)'", 1) as "selected"
         | )
    dss: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]
    
    scala> dss.show(false)
    +--------------------------------+---+--------------------------------------+--------+
    |trace                           |id |similarity                            |selected|
    +--------------------------------+---+--------------------------------------+--------+
    |47c10fda9b40407c998c154dc71a9e8c|617|                                      |        |
    |47c10fda9b40407c998c154dc71a9e8c|   |0.6530804801919593, 0.6359653379418201|        |
    |47c10fda9b40407c998c154dc71a9e8c|   |                                      |231     |
    +--------------------------------+---+--------------------------------------+--------+
    

    2) 按“trace”分组并消除正则表达式不匹配的情况。快速而肮脏的方法(如下所示)是选择每一列的 max,但如果您希望每次跟踪遇到多个匹配项,则可能需要做一些更复杂的事情:

    scala> val ds_final = dss.groupBy('trace).agg(max('id) as "id", max('similarity) as "similarity", max('selected) as "selected")
    ds_final: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]
    
    scala> ds_final.show(false)
    +--------------------------------+---+--------------------------------------+--------+
    |trace                           |id |similarity                            |selected|
    +--------------------------------+---+--------------------------------------+--------+
    |47c10fda9b40407c998c154dc71a9e8c|617|0.6530804801919593, 0.6359653379418201|231     |
    +--------------------------------+---+--------------------------------------+--------+
    

    【讨论】:

      【解决方案2】:

      我最终在

      行中使用了一些东西
      expected_schema = StructType([
         StructField("event_timestamp", TimestampType(), False),
         StructField("trace", StringType(), False),
         ...
      ])
      
      @F.pandas_udf(expected_schema, F.PandasUDFType.GROUPED_MAP)
      # Input/output are both a pandas.DataFrame
      def transform(pdf):
        output = {}
      
        for l in pdf.to_dict(orient='record'):
          x = re.findall(r'^(\[.*:\d+\]) (.*)', l['message'])[0][1]
          ...
      
        return pd.DataFrame(data=[output])
      
      df.groupby('trace').apply(transform)
      

      【讨论】:

        猜你喜欢
        • 2021-11-29
        • 1970-01-01
        • 2019-06-22
        • 1970-01-01
        • 2016-12-15
        • 1970-01-01
        • 2020-10-23
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多