【问题标题】:How to calculate difference between current and previous row in Spark JavaRDD如何计算 Spark JavaRDD 中当前行和上一行之间的差异
【发布时间】:2016-03-28 14:23:10
【问题描述】:

我将 .log 文件解析为 JavaRDD,在对这个 JavaRDD 进行排序之后,现在我有了,例如 oldJavaRDD:
2016-03-28 | 11:00 | X | object1 | region1
2016-03-28 | 11:01 | Y | object1 | region1
2016-03-28 | 11:05 | X | object1 | region1
@987654326 @
2016-03-28 | 11:00 | X | object2 | region1
2016-03-28 | 11:01 | Z | object2 | region1

如何获得newJavaRDD 以将其保存到数据库?
新的 JavaRDD 结构必须是:
2016-03-28 | 9 | object1 | region1
2016-03-28 | 1 | object2 | region1
所以,我必须计算当前行和上一行之间的时间(在某些情况下还使用标志X, Y, Z 来定义,是否添加时间到结果)并在更改date, objectNameobjectRegion 后将新元素添加到JavaRDD。

我可以使用这种类型的代码(ma​​p)来做到这一点,但我认为它不好,也不是最快的方式

    JavaRDD<NewObject> newJavaRDD = oldJavaRDD.map { r -> 
      String datePrev[] = ...
        if (datePrev != dateCurr ...) {
          return newJavaRdd;
        } else {
          return null;
        }
    }

【问题讨论】:

  • 你能解释一下X, Y, Z 的意思吗?不清楚哪些记录应该包含在输出中,哪些不应该...
  • 仅举例:上一行包含标志X,当前行包含Y,所以我们有过渡X-&gt;Y。在这种情况下,我们不能在这些行之间聚合时间,结果sum(11:01 - 11:00) = 0。如果Y-&gt;X,我们必须聚合行之间的时间,结果sum(11:05 - 11:01) = 4 minutes。如果X-&gt;X - 也聚合,结果4 minutes + sum(11:09 - 11:05) = 4 minutes + 4 minutes = 8 minutes。我还必须了解其他一些规则,但它们都与当前行和预览行之间的差异有关。

标签: java apache-spark rdd


【解决方案1】:

首先,您的代码示例从创建 newJavaRDD 的转换中引用newJavaRDD - 这在几个不同的级别上是不可能的:

  • 您不能引用该变量声明右侧的变量...
  • 您不能在 RDD 上的转换中使用 RDD(相同或另一个 - 没关系) - 转换中的任何内容都必须由 Spark 序列化,Spark 不能序列化自己的 RDD (这没有任何意义)

那么,你应该怎么做呢?

假设

  1. 您的意图是为date + object + region 的每个组合获取一条记录
  2. 每个这样的组合不应该有太多的记录,所以groupBy这些字段作为键是安全的

您可以groupBy 关键字段,然后mapValues 获取第一条记录和最后一条记录之间的“分钟距离”(如果我没有,可以更改传递给mapValues 的函数以包含您的确切逻辑修正它)。我将使用 Joda Time 库进行时间计算:

public static void main(String[] args) {
    // some setup code for this test:
    JavaSparkContext sc = new JavaSparkContext("local", "test");

    // input:
    final JavaRDD<String[]> input = sc.parallelize(Lists.newArrayList(
            //              date        time     ?    object     region
            new String[]{"2016-03-28", "11:00", "X", "object1", "region1"},
            new String[]{"2016-03-28", "11:01", "Y", "object1", "region1"},
            new String[]{"2016-03-28", "11:05", "X", "object1", "region1"},
            new String[]{"2016-03-28", "11:09", "X", "object1", "region1"},
            new String[]{"2016-03-28", "11:00", "X", "object2", "region1"},
            new String[]{"2016-03-28", "11:01", "Z", "object2", "region1"}
    ));

    // grouping by key:
    final JavaPairRDD<String, Iterable<String[]>> byObjectAndDate = input.groupBy(new Function<String[], String>() {
        @Override
        public String call(String[] record) throws Exception {
            return record[0] + record[3] + record[4]; // date, object, region
        }
    });

    // mapping each "value" (all record matching key) to result
    final JavaRDD<String[]> result = byObjectAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
        @Override
        public String[] call(Iterable<String[]> records) throws Exception {
            final Iterator<String[]> iterator = records.iterator();
            String[] previousRecord = iterator.next();
            int diffMinutes = 0;

            for (String[] record : records) {
                if (record[2].equals("X")) {  // if I got your intention right...
                    final LocalDateTime prev = getLocalDateTime(previousRecord);
                    final LocalDateTime curr = getLocalDateTime(record);
                    diffMinutes += Period.fieldDifference(prev, curr).toStandardMinutes().getMinutes();
                }
                previousRecord = record;
            }

            return new String[]{
                    previousRecord[0],
                    Integer.toString(diffMinutes),
                    previousRecord[3],
                    previousRecord[4]
            };
        }
    }).values();

    // do whatever with "result"...
}

// extracts a Joda LocalDateTime from a "record"
static LocalDateTime getLocalDateTime(String[] record) {
    return LocalDateTime.parse(record[0] + " " + record[1], formatter);
}

static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm");

附:在 Scala 中,这大约需要 8 行...:/

【讨论】:

  • 抱歉,我被伪代码弄糊涂了,newJavaRDD 是对的,我的意思是 return new NewObject(...)。没关系,你的回答真的很有帮助并且有效(幸运的是,我可以使用 java8 来减少愚蠢的行)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-10-29
  • 1970-01-01
  • 2023-02-02
  • 2022-01-16
  • 2017-12-24
  • 2023-03-19
  • 1970-01-01
相关资源
最近更新 更多