【问题标题】:Select columns that satisfy a condition选择满足条件的列
【发布时间】:2017-10-22 03:28:51
【问题描述】:

我在 zeppelin 中运行以下笔记本:

%spark.pyspark
l = [('user1', 33, 1.0, 'chess'), ('user2', 34, 2.0, 'tenis'), ('user3', None, None, ''), ('user4', None, 4.0, '   '), ('user5', None, 5.0, 'ski')]
df = spark.createDataFrame(l, ['name', 'age', 'ratio', 'hobby'])
df.show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- ratio: double (nullable = true)
 |-- hobby: string (nullable = true)
+-----+----+-----+-----+
| name| age|ratio|hobby|
+-----+----+-----+-----+
|user1|  33|  1.0|chess|
|user2|  34|  2.0|tenis|
|user3|null| null|     |
|user4|null|  4.0|     |
|user5|null|  5.0|  ski|
+-----+----+-----+-----+

agg_df = df.select(*[(1.0 - (count(c) / count('*'))).alias(c) for c in df.columns])
agg_df.show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- ratio: double (nullable = true)
 |-- hobby: string (nullable = true)
+----+---+-------------------+-----+
|name|age|              ratio|hobby|
+----+---+-------------------+-----+
| 0.0|0.6|0.19999999999999996|  0.0|
+----+---+-------------------+-----+

现在,我只想在 agg_df 中选择值

我不知道该怎么做。有什么提示吗?

【问题讨论】:

  • 您要比较哪一列的值?

标签: apache-spark pyspark spark-dataframe apache-zeppelin pyspark-sql


【解决方案1】:

您的意思是值

>>> [ key for (key,value) in agg_df.collect()[0].asDict().items() if value < 0.35  ]
['hobby', 'ratio', 'name']

使用以下 udf 函数将空白字符串替换为 Null。

from pyspark.sql.functions import udf
process = udf(lambda x: None if not x else (x if x.strip() else None))
df.withColumn('hobby', process(df.hobby)).show()
+-----+----+-----+-----+
| name| age|ratio|hobby|
+-----+----+-----+-----+
|user1|  33|  1.0|chess|
|user2|  34|  2.0|tenis|
|user3|null| null| null|
|user4|null|  4.0| null|
|user5|null|  5.0|  ski|
+-----+----+-----+-----+

【讨论】:

  • 如何在运行聚合之前用 None 替换空白字符串?在业余爱好中,我应该有 2 个空值。
  • @SofianeCherchalli 我已经更新了用空值替换空白字符串的答案。你能接受答案吗? :)
  • 完成。抱歉回复晚了
  • udf 函数只适用于字符串类型,不是吗?为了将其应用于整个数据框,我想我应该选择字符串类型的列,然后使用您的 udf 进行减少?
  • @SofianeCherchalli 是的 udf 类型仅适用于字符串。理想情况下,您应该仅将 udf 函数应用于必须处理的字符串列。
【解决方案2】:

这是我根据rogue-one 指示寻找的功能的尝试。不确定它是最快的还是最优化的:

from pyspark.sql.functions import udf, count
from functools import reduce

def filter_columns(df, threshold=0.35):
        process = udf(lambda x: None if not x else (x if x.strip() else None)) # udf for stripping string values
        string_cols = ([c for c in df.columns if df.select(c).dtypes[0][1] == 'string']) # string columns
        new_df = reduce(lambda df, x: df.withColumn(x, process(x)), string_cols, df) # process all string columns

        agg_df = new_df.select(*[(1.0 - (count(c) / count('*'))).alias(c) for c in new_df.columns]) # compute non-null/df.count ratio
        cols_match_threshold = [ key for (key, value) in agg_df.collect()[0].asDict().items() if value < threshold ] # select only cols which value < threshold

        return new_df.select(cols_match_threshold)



filter_columns(df, 0.35).show()
+-----+-----+
|ratio| name|
+-----+-----+
|  1.0|user1|
|  2.0|user2|
| null|user3|
|  4.0|user4|
|  5.0|user5|
+-----+-----+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-04-24
    • 2015-09-29
    • 2017-01-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-20
    相关资源
    最近更新 更多