【问题标题】:Unzip pyspark pipelineRDD of python dicts to pyspark Dataframe将 python dicts 的 pyspark pipelineRDD 解压缩到 pyspark Dataframe
【发布时间】:2021-07-10 20:43:57
【问题描述】:

我正在使用平面图来解析数据框,它工作正常,但我无法将最终结果重塑为多列数据集。我怎样才能解析这个RDD?这是我在平面图之后的结果示例行:

[Row(XXXX-XXXX-XXXX-XXXXX-XXXXXX={'m_ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX', 'ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX', 'pp_breaker_power_phase': 'L1_L2', 'pp_breaker_poles': 2, 'pp_breaker_panel_circuit_number': 2, 'cp_ci_id': None, 'cp_value': None, 'phase': 'L1', 'pole': 2})]

我正在传递一个数据框,其列与您在 dict 中看到的列相同,这是我与平面图一起使用的函数:

def get_poles_phases(row):
    """
    :param row:
    :return:
    """
    new_rows = []
    initial_pole = row.pp_breaker_panel_circuit_number
    phases = row.pp_breaker_power_phase.split('_')

    for _ in range(row.pp_breaker_poles):
        temp = row.asDict()
        temp['phase'] = phases[_]
        temp['pole'] = initial_pole

        if row.cp_value != 'Phase Grouping':
            initial_pole += 2
        else:
            logger.error('Panel configuration not recognized.')
        
        new_rows.append(row(temp))

    return new_rows

我尝试使用 Structfields 架构,但没有成功

 cols = [StructField('m_ci_id', StringType(), True),
         StructField('ci_id', StringType(), True),
         StructField('pp_breaker_power_phase', StringType(), True),
         StructField('pp_breaker_poles', StringType(), True),
         StructField('pp_breaker_panel_circuit_number', StringType(), True),
         StructField('cp_ci_id', StringType(), True),
         StructField('cp_value', StringType(), True),
         StructField('phase', StringType(), True),
         StructField('pole', StringType(), True)]

schema = StructType(cols)
poles_phases = poles_phases.toDF(schema)

我也尝试过传递列名列表。

poles_phases = poles_phases.toDF(['m_ci_id', 'ci_id', 'pp_breaker_power_phase', 'pp_breaker_poles', 'pp_breaker_panel_circuit_number', 'cp_ci_id', 'cp_value', 'phase', 'pole'])

我怀疑这不起作用,因为我得到一个只有一列的 RDD,但我不知道如何解析该单个 dict 以便架构匹配。

【问题讨论】:

    标签: python python-3.x apache-spark pyspark flatmap


    【解决方案1】:

    我想通了:

    from pyspark.sql import Row
    poles_phases = poles_phases.map(lambda row: Row(**list(row.asDict().values())[0]))
    

    这是通过解压值字典来构建一个新行。之后就可以使用了

    poles_phases = poles_phases.toDF(['m_ci_id', 'ci_id', 'pp_breaker_power_phase', 'pp_breaker_poles', 'pp_breaker_panel_circuit_number', 'cp_ci_id', 'cp_value', 'phase', 'pole'])
    

    如果您有 None 值,架构推断可能会失败,因此您需要明确声明它,例如,

    cols = [StructField('m_ci_id', StringType(), True),
            StructField('ci_id', StringType(), True),
            StructField('pp_breaker_power_phase', StringType(), True),
            StructField('pp_breaker_poles', StringType(), True),
            StructField('pp_breaker_panel_circuit_number', StringType(), True),
            StructField('cp_ci_id', StringType(), True),
            StructField('cp_value', StringType(), True),
            StructField('phase', StringType(), True),
            StructField('pole', StringType(), True)]
    
    schema = StructType(cols)
    poles_phases = poles_phases.toDF(schema)
    

    【讨论】:

      猜你喜欢
      • 2020-05-21
      • 1970-01-01
      • 1970-01-01
      • 2022-01-19
      • 1970-01-01
      • 2018-07-04
      • 1970-01-01
      • 2021-07-05
      • 1970-01-01
      相关资源
      最近更新 更多