【问题标题】:pyspark dataframe modifying columnspyspark数据框修改列
【发布时间】:2020-04-11 15:36:17
【问题描述】:

我有如下输入数据框,其中输入列是动态的,即它可以是 n 个数字 - 比如 input1 到 input2

+----+----+-------+------+------+
|dim1|dim2|  byvar|input1|input2|
+----+----+-------+------+------+
| 101| 102|MTD0001|     1|    10|
| 101| 102|MTD0002|     2|    12|
| 101| 102|MTD0003|     3|    13|

想修改如下列,怎么可能?

+----+----+-------+----------+------+
|dim1|dim2|  byvar|TRAMS_NAME|values|
+----+----+-------+----------+------+
| 101| 102|MTD0001|    input1|     1|
| 101| 102|MTD0001|    input2|    10|
| 101| 102|MTD0002|    input1|     2|
| 101| 102|MTD0002|    input2|    12|
| 101| 102|MTD0003|    input1|     3|
| 101| 102|MTD0003|    input2|    13|

我使用了 create_map spark 方法,但它是硬编码的方法。还有其他方法可以达到同样的效果吗??

【问题讨论】:

  • 为什么TRAMS_NAME input1 和 input2 的前两行是 value1 和 value 2?它们不应该都是value1还是value2?
  • 是我的错。我已经更新了,请检查。@Mohammad Murtaza Hashmi

标签: dataframe pyspark hive pyspark-dataframes


【解决方案1】:

这是使用 stack() 函数解决问题的另一种方法。当然,它可能会更简单一些,但您必须明确输入列名。

希望这会有所帮助!

# set your dataframe
df = spark.createDataFrame(
    [(101, 102, 'MTD0001', 1, 10),
     (101, 102, 'MTD0002', 2, 12),
     (101, 102, 'MTD0003', 3, 13)],
    ['dim1', 'dim2', 'byvar', 'v1', 'v2']
)

df.show()
+----+----+-------+---+---+
|dim1|dim2|  byvar| v1| v2|
+----+----+-------+---+---+
| 101| 102|MTD0001|  1| 10|
| 101| 102|MTD0002|  2| 12|
| 101| 102|MTD0003|  3| 13|
+----+----+-------+---+---+

result = df.selectExpr('dim1', 
                       'dim2', 
                       'byvar', 
                       "stack(2, 'v1', v1, 'v2', v2) as (names, values)")
result.show()
+----+----+-------+-----+------+
|dim1|dim2|  byvar|names|values|
+----+----+-------+-----+------+
| 101| 102|MTD0001|   v1|     1|
| 101| 102|MTD0001|   v2|    10|
| 101| 102|MTD0002|   v1|     2|
| 101| 102|MTD0002|   v2|    12|
| 101| 102|MTD0003|   v1|     3|
| 101| 102|MTD0003|   v2|    13|
+----+----+-------+-----+------+

如果我们想动态设置要堆叠的列,我们只需要设置未更改的列,在您的示例中是 dim1dim2byvar 并使用 for 循环创建堆栈语句。

# set static columns
unaltered_cols = ['dim1', 'dim2', 'byvar']
# extract columns to stack
change_cols = [n for n in df.schema.names if not n in unaltered_cols]
cols_exp = ",".join(["'" + n + "'," + n for n in change_cols])
# create stack sentence
stack_exp = "stack(" + str(len(change_cols)) +',' + cols_exp + ") as (names, values)"
# print final expression
print(stack_exp)
# --> stack(2,'v1',v1,'v2',v2) as (names, values)

# apply transformation
result = df.selectExpr('dim1', 
                       'dim2', 
                       'byvar', 
                       stack_exp)
result.show()
+----+----+-------+-----+------+
|dim1|dim2|  byvar|names|values|
+----+----+-------+-----+------+
| 101| 102|MTD0001|   v1|     1|
| 101| 102|MTD0001|   v2|    10|
| 101| 102|MTD0002|   v1|     2|
| 101| 102|MTD0002|   v2|    12|
| 101| 102|MTD0003|   v1|     3|
| 101| 102|MTD0003|   v2|    13|
+----+----+-------+-----+------+

如果我们运行相同的代码但使用不同的数据框,您将获得所需的结果。

df = spark.createDataFrame(
    [(101, 102, 'MTD0001', 1, 10, 4),
     (101, 102, 'MTD0002', 2, 12, 5),
     (101, 102, 'MTD0003', 3, 13, 5)],
    ['dim1', 'dim2', 'byvar', 'v1', 'v2', 'v3']
)
# Re-run the code to create the stack_exp before!
result = df.selectExpr('dim1', 
                       'dim2', 
                       'byvar', 
                       stack_exp)
result.show()
+----+----+-------+-----+------+
|dim1|dim2|  byvar|names|values|
+----+----+-------+-----+------+
| 101| 102|MTD0001|   v1|     1|
| 101| 102|MTD0001|   v2|    10|
| 101| 102|MTD0001|   v3|     4|
| 101| 102|MTD0002|   v1|     2|
| 101| 102|MTD0002|   v2|    12|
| 101| 102|MTD0002|   v3|     5|
| 101| 102|MTD0003|   v1|     3|
| 101| 102|MTD0003|   v2|    13|
| 101| 102|MTD0003|   v3|     5|
+----+----+-------+-----+------+

【讨论】:

  • 谢谢,但是如果堆栈中有多个列,我们可以使用 for 循环传递值和列吗???
  • 我添加了解决多列问题的代码!
【解决方案2】:

Sample DataFrame:

df.show() #added more columns to show code is dynamic
+----+----+-------+------+------+------+------+------+------+
|dim1|dim2|  byvar|input1|input2|input3|input4|input5|input6|
+----+----+-------+------+------+------+------+------+------+
| 101| 102|MTD0001|     1|    10|     3|     6|    10|    13|
| 101| 102|MTD0002|     2|    12|     4|     8|    11|    14|
| 101| 102|MTD0003|     3|    13|     5|     9|    12|    15|
+----+----+-------+------+------+------+------+------+------+

对于 Spark2.4+,您可以使用 explodearrays_ziparray 动态执行此操作strong> 和 element_at 来获取您的 2 列。只要您的 输入列 的名称以 'input'

开头,这将起作用
from pyspark.sql import functions as F
df.withColumn("vals",\
F.explode(F.arrays_zip(F.array([F.array(F.lit(x),F.col(x)) for x in df.columns if x!=['dim1','dim2','byvar']]))))\
.select("dim1", "dim2","byvar","vals.*").withColumn("TRAMS_NAME", F.element_at("0",1))\
                                                    .withColumn("VALUES", F.element_at("0",2)).drop("0").show()

+----+----+-------+----------+------+
|dim1|dim2|  byvar|TRAMS_NAME|VALUES|
+----+----+-------+----------+------+
| 101| 102|MTD0001|    input1|     1|
| 101| 102|MTD0001|    input2|    10|
| 101| 102|MTD0001|    input3|     3|
| 101| 102|MTD0001|    input4|     6|
| 101| 102|MTD0001|    input5|    10|
| 101| 102|MTD0001|    input6|    13|
| 101| 102|MTD0002|    input1|     2|
| 101| 102|MTD0002|    input2|    12|
| 101| 102|MTD0002|    input3|     4|
| 101| 102|MTD0002|    input4|     8|
| 101| 102|MTD0002|    input5|    11|
| 101| 102|MTD0002|    input6|    14|
| 101| 102|MTD0003|    input1|     3|
| 101| 102|MTD0003|    input2|    13|
| 101| 102|MTD0003|    input3|     5|
| 101| 102|MTD0003|    input4|     9|
| 101| 102|MTD0003|    input5|    12|
| 101| 102|MTD0003|    input6|    15|
+----+----+-------+----------+------+

【讨论】:

  • 谢谢,但列是动态的,列标题也可能会改变。反正我会试试的。
  • 要做到这一点,您只需将 x.startswith('input') 替换为 if x!=['dim1','dim2','byvar'].. 我已编辑。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-05-11
  • 2018-01-06
  • 2018-01-09
  • 1970-01-01
  • 1970-01-01
  • 2018-10-05
相关资源
最近更新 更多