【问题标题】:pyspark dataframe filter or include based on listpyspark 数据框过滤器或包含基于列表
【发布时间】:2017-03-18 05:43:37
【问题描述】:

我正在尝试使用列表过滤 pyspark 中的数据框。我想根据列表进行过滤或仅包含列表中具有值的那些记录。我下面的代码不起作用:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)

给出以下错误: ValueError:无法将列转换为布尔值:请使用 '&' 表示 'and'、'|' for 'or', '~' for 'not' 在构建 DataFrame 布尔表达式时。

【问题讨论】:

    标签: apache-spark filter pyspark apache-spark-sql


    【解决方案1】:

    无法评估它所说的“df.score in l”,因为 df.score 为您提供了一个列,而“in”未在该列类型上定义使用“isin”

    代码应该是这样的:

    # define a dataframe
    rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
    df = sqlContext.createDataFrame(rdd, ["id", "score"])
    
    # define a list of scores
    l = [10,18,20]
    
    # filter out records by scores by list l
    records = df.filter(~df.score.isin(l))
    # expected: (0,1), (0,1), (0,2), (1,2)
    
    # include only records with these scores in list l
    df.filter(df.score.isin(l))
    # expected: (1,10), (1,20), (3,18), (3,18), (3,18)
    

    注意where() is an alias for filter(),所以两者可以互换。

    【讨论】:

    • 你将如何使用广播变量作为列表而不是常规 python 列表来做到这一点?当我尝试这样做时,我得到一个“广播”对象没有属性“_get_object_id”错误。
    • @flyingmeatball 我想你可以通过 broadcast_variable_name.value 访问列表
    • 如果你想使用广播,那么这是要走的路:l_bc = sc.broadcast(l) 后跟df.where(df.score.isin(l_bc.value))
    【解决方案2】:

    根据@user3133475的回答,也可以像这样从F.col()调用isin()方法:

    import pyspark.sql.functions as F
    
    
    l = [10,18,20]
    df.filter(F.col("score").isin(l))
    

    【讨论】:

      【解决方案3】:

      我发现join 实现对于大型数据帧比where 快得多:

      def filter_spark_dataframe_by_list(df, column_name, filter_list):
          """ Returns subset of df where df[column_name] is in filter_list """
          spark = SparkSession.builder.getOrCreate()
          filter_df = spark.createDataFrame(filter_list, df.schema[column_name].dataType)
          return df.join(filter_df, df[column_name] == filter_df["value"])
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-09-02
        • 2020-08-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-19
        • 2018-12-20
        • 2019-06-29
        相关资源
        最近更新 更多