【问题标题】:pyspark, Compare two rows in dataframepyspark,比较数据框中的两行
【发布时间】:2016-07-06 17:03:17
【问题描述】:

我正在尝试将数据框中的一行与下一行进行比较,以查看时间戳的差异。目前数据如下:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07

我已经尝试将一个函数映射到数据框以允许像这样进行比较:(注意:我正在尝试获取差异大于 4 小时的行)

items = df.limit(10)\
          .orderBy('itemid', desc('stamp'))\
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()

但我收到以下错误:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe

我认为这是由于我错误地使用了地图功能。使用地图的帮助,或其他解决方案将不胜感激。

更新: @zero323 的回答是关于我不正确使用映射的信息,但是我使用的系统运行的是 2.02 之前的 Spark 版本,并且我正在处理 Cassandra 中的数据。

我设法用 mapPartitions 解决了这个问题。请参阅下面的答案。

更新(2017 年 3 月 27 日): 自从最初在这篇文章中标记答案以来,我对 Spark 的理解有了显着提高。我在下面更新了我的答案以显示我当前的解决方案。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    是的,您以错误的方式使用 map 函数。 map 当时对单个元素进行操作。您可以尝试使用这样的窗口函数:

    from pyspark.sql.functions import col, lag
    from pyspark.sql.window import Window
    
    df = (
        sc.parallelize([
            (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
            (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
        ]).toDF(["itemid", "eventid", "timestamp"])
        .withColumn("timestamp", col("timestamp").cast("timestamp"))
    )
    
    w = Window.partitionBy("itemid").orderBy("timestamp")
    
    diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")
    
    df.withColumn("diff", diff)
    

    【讨论】:

    • 虽然请确保您在这里使用 HiveContext 或 Spark 2.02
    • 感谢您对 map 的深入了解,但显然需要 Hive 上下文才能使用 window。我正在使用的系统只是 Cassandra 的火花。我会更新问题以说明这一点。
    • HiveContext 每次查看都不需要 Hive。它只需要使用 Hive 支持构建的 Spark(这是预构建二进制文件的默认设置)。
    • 我有一个关于“diff =”行的问题。窗口本身是否没有范围函数,您可以在其中指定获取当前行和下一行?但你的解决方法不同。
    • @Matthias 对这里有什么帮助?
    【解决方案2】:

    @ShuaiYuan 对原始答案的评论是正确的。在过去的一年里,我对 Spark 的工作原理有了更深入的了解,并且实际上重写了我为这篇文章编写的程序。

    新答案 (2017/03/27)
    为了完成数据帧的两行比较,我最终使用了 RDD。我按键(在本例中为项目 ID)对数据进行分组并忽略 eventid,因为它在此等式中无关紧要。然后我将一个 lambda 函数映射到行上,返回一个键的元组和一个包含事件间隙的开始和结束的元组列表,它派生自“findGaps”函数,该函数迭代链接的值列表(排序的时间戳)到每个键。完成后,我会过滤掉没有时间间隔的键,然后使用 flatMapValues 将数据返回到更像 sql 的格式。这是通过以下代码完成的:

    # Find time gaps in list of datetimes where firings are longer than given duration.  
    def findGaps(dates, duration):
        result = []
        length = len(dates)
    
        # convert to dates for comparison
        first = toDate(dates[0])
        last = toDate(dates[length - 1])
        for index, item in enumerate(dates):
            if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
                # build outage tuple and append to list
                # format (start, stop, duration)
                result.append(formatResult(item, dates[index + 1], kind))
        return result
    
    outage_list = outage_join_df.rdd\
                                .groupByKey()\
                                .map(lambda row: (
                                         row[0],
                                         findGaps(
                                             sorted(list(row[1])), 
                                             limit
                                         )
                                      )
                                )\
                                .filter(lambda row: len(row[1]) > 0)\
                                .flatMapValues(lambda row: row)\
                                .map(lambda row: (
                                     row[0]['itemid'],     # itemid
                                     row[1][0].date(),     # date
                                     row[1][0],            # start
                                     row[1][1],            # stop
                                     row[1][2]             # duration
                                ))\
                                .collect()
    

    原始答案(错误)
    我设法使用 mapPartitions 解决了它:

    def findOutage(items):
        outages = []
    
        lastStamp = None
        for item in items:
            if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
                outages.append({"item": item.itemid, 
                                "start": item.stamp.isoformat(),
                                "stop": lastStamp.isoformat()})
            lastStamp = item.stamp
        return iter(outages)
    
    items = df.limit(10).orderBy('itemid', desc('stamp'))
    
    outages = items.mapPartitions(findOutage).collect()
    

    感谢大家的帮助!

    【讨论】:

    • 需要确保数据集按timestamp分区。
    • @ShuaiYuan 你是对的。我已经更新了我的答案,以展示我目前对该问题的解决方案。
    猜你喜欢
    • 2020-06-02
    • 2017-07-30
    • 1970-01-01
    • 1970-01-01
    • 2021-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多