【问题标题】:How to get list of records from matching two DataFrames?如何从匹配的两个 DataFrame 中获取记录列表?
【发布时间】:2021-01-16 10:09:54
【问题描述】:

我正在为一个项目开发 PySpark 脚本。
我有这样的输入数据框:

+---+---------+
| id|direction|
+---+---------+
|  2|       up|
|  3|       up|
|  4|     down|
|  5|       up|
|  6|     down|
|  7|     down|
+---+---------+

我处理的数据框是:

+----+---+---------+-----+
| day| id|direction|count|
+----+---+---------+-----+
|day1|  1|       up|   10|
|day1|  2|       up|   40|
|day1|  3|       up|   42|
|day1|  4|     down|   39|
|day1|  5|       up|   55|
|day1|  6|     down|   43|
|day1|  7|     down|   41|
|day2|  1|     down|   39|
|day2|  2|       up|   44|
|day2|  3|       up|   50|
|day2|  4|     down|   43|
|day2|  5|     down|   34|
|day2|  6|     down|   30|
|day2|  7|       up|   23|
|day3|  1|     down|   20|
|day3|  2|       up|   25|
|day3|  3|       up|   33|
|day3|  4|       up|   41|
|day3|  5|       up|   55|
|day3|  6|     down|   33|
|day3|  7|     down|   23|
|day4|  1|       up|   45|
|day4|  2|       up|   56|
|day4|  3|       up|   60|
|day4|  4|     down|   49|
|day4|  5|       up|   61|
|day4|  6|     down|   53|
|day4|  7|     down|   40|
|day5|  1|       up|   20|
|day5|  2|       up|   30|
|day5|  3|       up|   37|
|day5|  4|     down|   19|
|day5|  5|       up|   25|
|day5|  6|     down|   23|
|day5|  7|     down|   18|
|day6|  1|       up|   11|
|day6|  2|     down|    9|
|day6|  3|     down|    8|
|day6|  4|     down|    6|
|day6|  5|       up|   23|
|day6|  6|       up|   29|
|day6|  7|       up|   34|
+----+---+---------+-----+

我想绘制与我的输入数据框记录匹配的天数图表。
像这里一样,day1, day4, day5 与输入数据帧匹配。 因此,我的最终输出将是 3 个线图,分别表示第 1、4 和 5 天。x-axis as idy-axis as count

我使用了groupBy()join() 函数,但它不会给我确切的结果。 我也不确定如何绘制数据框结果。我必须先将其转换为list 之类的结构吗?

编辑:我创建了包含几行的临时数据框:

df_input = spark.createDataFrame({
    (2, 'up', ),
    (3, 'up', ),
    (4, 'down', ),
    (5, 'up', ),
    (6, 'down', ),
    (7, 'down', )
}, ['id', 'direction'])

df_input.sort('id').show()

df_proccsed_table = spark.createDataFrame({
    ('day1', 1, 'up', 10, ),
    ('day1', 2, 'up', 40, ),
    ('day1', 3, 'up', 42, ),
    ('day1', 4, 'down', 39, ),
    ('day1', 5, 'up', 55, ),
    ('day1', 6, 'down', 43, ),
    ('day1', 7, 'down', 41, ),
    ('day2', 1, 'down', 39, ),
    ('day2', 2, 'up', 44),
    ('day2', 3, 'up', 50),
    ('day2', 4, 'down', 43),
    ('day2', 5, 'down', 34),
    ('day2', 6, 'down', 30),
    ('day2', 7, 'up', 23),
    ('day3', 1, 'down', 20),
    ('day3', 2, 'up', 25),
    ('day3', 3, 'up', 33),
    ('day3', 4, 'up', 41),
    ('day3', 5, 'up', 55),
    ('day3', 6, 'down', 33),
    ('day3', 7, 'down', 23),
    ('day4', 1, 'up', 45),
    ('day4', 2, 'up', 56),
    ('day4', 3, 'up', 60),
    ('day4', 4, 'down', 49),
    ('day4', 5, 'up', 61),
    ('day4', 6, 'down', 53),
    ('day4', 7, 'down', 40),
    ('day5', 1, 'up', 20),
    ('day5', 2, 'up', 30),
    ('day5', 3, 'up', 37),
    ('day5', 4, 'down', 19),
    ('day5', 5, 'up', 25),
    ('day5', 6, 'down', 23),
    ('day5', 7, 'down', 18),
    ('day6', 1, 'up', 11),
    ('day6', 2, 'down', 9),
    ('day6', 3, 'down', 8),
    ('day6', 4, 'down', 6),
    ('day6', 5, 'up', 23),
    ('day6', 6, 'up', 29),
    ('day6', 7, 'up', 34),
}, ['day', 'id', 'direction', 'count'])
    
df_proccsed_table.sort('day', 'id').show(100)

【问题讨论】:

    标签: dataframe apache-spark matplotlib pyspark


    【解决方案1】:

    您可以进行左连接,并将id 中每组中的非空行数与df_input 中的行数进行比较,以确定模式是否完全匹配。

    import pyspark.sql.functions as F
    
    filtered = df_proccsed_table.join(
        df_input.toDF('id2', 'direction2'),
        F.expr('id = id2 and direction = direction2'),
        'left'
    ).withColumn(
        'match',
        F.expr(f'count(id2) over(partition by day) = {df_input.count()}')
    ).filter('match').drop('match', 'id2', 'direction2').sort('day', 'id')
    
    filtered.show(999)
    +----+---+---------+-----+
    | day| id|direction|count|
    +----+---+---------+-----+
    |day1|  1|       up|   10|
    |day1|  2|       up|   40|
    |day1|  3|       up|   42|
    |day1|  4|     down|   39|
    |day1|  5|       up|   55|
    |day1|  6|     down|   43|
    |day1|  7|     down|   41|
    |day4|  1|       up|   45|
    |day4|  2|       up|   56|
    |day4|  3|       up|   60|
    |day4|  4|     down|   49|
    |day4|  5|       up|   61|
    |day4|  6|     down|   53|
    |day4|  7|     down|   40|
    |day5|  1|       up|   20|
    |day5|  2|       up|   30|
    |day5|  3|       up|   37|
    |day5|  4|     down|   19|
    |day5|  5|       up|   25|
    |day5|  6|     down|   23|
    |day5|  7|     down|   18|
    +----+---+---------+-----+
    

    绘制结果:

    pdf = filtered.toPandas()
    pdf.set_index('id').groupby('day')['count'].plot(legend=True)
    
    import matplotlib.pyplot as plt
    plt.show()
    

    【讨论】:

    • 您能解释一下match 列结果是如何形成的吗?表示expr 代码中的作用是什么?
    • @VivekChaudhari match 是一个布尔列,指定每一天的非空id2 的计数是否等于df_input 的行数。如果是这样,则意味着df_input 中的整个模式都匹配了。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-08-15
    • 2021-04-23
    • 1970-01-01
    • 2021-11-26
    • 2016-11-03
    • 2021-09-08
    • 2017-05-09
    相关资源
    最近更新 更多