【问题标题】:Fast sorting/Filtering based on alternating values基于交替值的快速排序/过滤
【发布时间】:2017-06-06 14:25:34
【问题描述】:

考虑以下示例数据表,

dt <- data.table(src = LETTERS[1:10], 
                 dst = LETTERS[10:1], 
                 src1 = letters[15:24], 
                 dst1 = letters[24:15])

#which looks like,
#    src dst src1 dst1
# 1:   A   J    o    x
# 2:   B   I    p    w
# 3:   C   H    q    v
# 4:   D   G    r    u
# 5:   E   F    s    t
# 6:   F   E    t    s
# 7:   G   D    u    r
# 8:   H   C    v    q
# 9:   I   B    w    p
#10:   J   A    x    o

第一个目标是根据 reversed 逐行配对元素(src - dst & src1 - dst1)对其进行排序,这可以通过以下方式实现以创建 5 '对':

dt[, key := paste0(pmin(src, dst), pmax(src, dst), pmin(src1, dst1), pmax(src1, dst1))][order(key)]

#    src dst src1 dst1  key
# 1:   A   J    o    x AJox
# 2:   J   A    x    o AJox
# 3:   B   I    p    w BIpw
# 4:   I   B    w    p BIpw
# 5:   C   H    q    v CHqv
# 6:   H   C    v    q CHqv
# 7:   D   G    r    u DGru
# 8:   G   D    u    r DGru
# 9:   E   F    s    t EFst
#10:   F   E    t    s EFst

但是,在现实生活中,至少有一行没有一对(“不完整流”)。所以一个现实生活中的例子是,

#              cdatetime   srcaddr dstaddr  srcport   dstport     key    totals time_diff
# 1: 2017-05-12 14:58:32    IP_1    IP_2   54793    8080 182808054793      3        NA
# 2: 2017-05-12 14:58:32    IP_2    IP_1    8080   54793 182808054793      3         0
# 3: 2017-05-17 08:37:16    IP_1    IP_2   54793    8080 182808054793      3    409124
# 4: 2017-05-11 08:12:28    IP_1    IP_2   54813    8080 182808054813      3        NA
# 5: 2017-05-11 08:12:28    IP_2    IP_1    8080   54813 182808054813      3         0
# 6: 2017-05-17 08:37:16    IP_1    IP_2   54813    8080 182808054813      3    519888
# 7: 2017-05-02 06:51:16    IP_1    IP_2   50794    8080 182808050794      5        NA
# 8: 2017-05-02 06:51:16    IP_2    IP_1    8080   50794 182808050794      5         0
# 9: 2017-05-08 06:57:08    IP_1    IP_2   50794    8080 182808050794      5    518752
#10: 2017-05-11 06:32:49    IP_1    IP_2   50794    8080 182808050794      5    257741
#11: 2017-05-11 06:32:49    IP_2    IP_1    8080   50794 182808050794      5         0
#12: 2017-05-04 06:52:05    IP_1    IP_2   51896    8080 182808051896      5        NA
#13: 2017-05-04 06:52:05    IP_2    IP_1    8080   51896 182808051896      5         0
#14: 2017-05-04 10:22:26    IP_1    IP_2   51896    8080 182808051896      5     12621
#15: 2017-05-04 10:22:26    IP_2    IP_1    8080   51896 182808051896      5         0
#16: 2017-05-08 07:22:47    IP_1    IP_2   51896    8080 182808051896      5    334821
#17: 2017-05-15 05:56:00    IP_1    IP_2   62744     162  17016262744      3        NA
#18: 2017-05-17 10:41:00    IP_1    IP_2   62744     162  17016262744      3    189900
#19: 2017-05-18 09:31:00    IP_1    IP_2   62744     162  17016262744      3     82200

第二个目标现在是删除那些不完整的流程。现在,为了识别那些“不完整的流”,我们计算它们之间的时间差并将 30 设置为阈值。棘手的部分来了;如果我们只有 2 行并且它们之间的间隔超过 30 秒,那么将它们都过滤掉并过滤 3 行(对于某个键),那么 time diff > 30 的那一行需要删除 - 或者如果其中两个 > 30相隔几秒钟,删除所有 3。然而,当我们有 4 行或更多行时,我们需要删除时差 > 30 没有另一对与 。从上表中可以看出,第 3、6、9、16、17、18、19 行必须以相隔 30 秒以上为条件进行删除。查看cdatetime 将阐明哪些是完整的流程。

预期输出为

 #        cdatetime      srcaddr dstaddr srcport dstport          key totals time_diff
# 1: 2017-05-12 14:58:32    IP_1    IP_2   54793    8080 182808054793      3        NA
# 2: 2017-05-12 14:58:32    IP_2    IP_1    8080   54793 182808054793      3         0
# 3: 2017-05-11 08:12:28    IP_1    IP_2   54813    8080 182808054813      3        NA
# 4: 2017-05-11 08:12:28    IP_2    IP_1    8080   54813 182808054813      3         0
# 5: 2017-05-02 06:51:16    IP_1    IP_2   50794    8080 182808050794      5        NA
# 6: 2017-05-02 06:51:16    IP_2    IP_1    8080   50794 182808050794      5         0
# 7: 2017-05-11 06:32:49    IP_1    IP_2   50794    8080 182808050794      5    257741
# 8: 2017-05-11 06:32:49    IP_2    IP_1    8080   50794 182808050794      5         0
# 9: 2017-05-04 06:52:05    IP_1    IP_2   51896    8080 182808051896      5        NA
#10: 2017-05-04 06:52:05    IP_2    IP_1    8080   51896 182808051896      5         0
#11: 2017-05-04 10:22:26    IP_1    IP_2   51896    8080 182808051896      5     12621
#12: 2017-05-04 10:22:26    IP_2    IP_1    8080   51896 182808051896      5         0

以上真实案例的数据

structure(list(cdatetime = structure(c(1494590312, 1494590312, 
1494999436, 1494479548, 1494479548, 1494999436, 1493697076, 1493697076, 
1494215828, 1494473569, 1494473569, 1493869925, 1493869925, 1493882546, 
1493882546, 1494217367, 1494816960, 1495006860, 1495089060), class = c("POSIXct", 
"POSIXt"), tzone = ""), srcaddr = structure(c(1L, 2L, 1L, 1L, 
2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 2L, 1L, 2L, 1L, 1L, 1L, 1L), .Label = c("IP_1", 
"IP_2"), class = "factor"), dstaddr = structure(c(2L, 1L, 2L, 
2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 1L, 2L, 1L, 2L, 2L, 2L, 2L
), .Label = c("IP_1", "IP_2"), class = "factor"), srcport = c(54793L, 
8080L, 54793L, 54813L, 8080L, 54813L, 50794L, 8080L, 50794L, 
50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 51896L, 62744L, 
62744L, 62744L), dstport = c(8080L, 54793L, 8080L, 8080L, 54813L, 
8080L, 8080L, 50794L, 8080L, 8080L, 50794L, 8080L, 51896L, 8080L, 
51896L, 8080L, 162L, 162L, 162L), key = c(182808054793, 182808054793, 
182808054793, 182808054813, 182808054813, 182808054813, 182808050794, 
182808050794, 182808050794, 182808050794, 182808050794, 182808051896, 
182808051896, 182808051896, 182808051896, 182808051896, 17016262744, 
17016262744, 17016262744), totals = c(3L, 3L, 3L, 3L, 3L, 3L, 
5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 3L, 3L, 3L), time_diff = c(NA, 
0, 409124, NA, 0, 519888, NA, 0, 518752, 257741, 0, NA, 0, 12621, 
0, 334821, NA, 189900, 82200)), .Names = c("cdatetime", "srcaddr", 
"dstaddr", "srcport", "dstport", "key", "totals", "time_diff"
), row.names = c(NA, -19L), class = c("data.table", "data.frame"
))

以上所有内容都将在大约一个数据集上运行。 180M 行,所以效率是这里的关键词。

【问题讨论】:

  • @Masoud 但我只有一半问题的代码。
  • 你比我清楚。只是一个关于代码审查的建议,你会得到关于性能的很好的答案。如果在这里你永远不会。
  • 看起来你在做核心数据分析。也许 C 或 C++ 将成为性能提升的替代语言。
  • @MinhKieu 同意。但我知道 0 C++/C 编程。 Rcpp 包在这里可能是一个优势

标签: r performance data.table


【解决方案1】:

这是我识别成对的完整流程的建议。 (但是,请注意最后的警告)

library(data.table)   # CRAN version 1.10.4 used 

# set keys to avoid using order() repeatedly
setkey(DT, key, cdatetime)
# compute time diff again, grouped by key,
# using shift() with default "lag" type and a useful fill value 
DT[, time_diff := difftime(cdatetime, shift(cdatetime, fill = -Inf), units = "secs"), by = key]
# now find the begin of a potentially new pair where time_diff > 30 secs
# and count the jumps within each key group using cumsum()
DT[, pair.id := cumsum(time_diff > 30), by = key]
# count the number of partners within each supposedly pair
DT[, count.pairs := .N, .(key, pair.id)]

请注意,在 R 中,逻辑值可以强制转换为数字:

as.integer(FALSE)
#[1] 0
as.integer(TRUE)
#[1] 1

因此,每次 cumsum() 发现 time_diff &gt; 30TRUE 时,pair.id 计数都会增加 1。否则,即如果时间差小于 30 秒,pair.id 保持不变。通过这种方式,可以识别 30 秒时间窗口内的事件对或事件组。

现在,DT增加了两列(注意setkey()改变了行的顺序):

              cdatetime srcaddr dstaddr srcport dstport          key totals   time_diff pair.id count.pairs
 1: 2017-05-15 04:56:00    IP_1    IP_2   62744     162  17016262744      3    Inf secs       1           1
 2: 2017-05-17 09:41:00    IP_1    IP_2   62744     162  17016262744      3 189900 secs       2           1
 3: 2017-05-18 08:31:00    IP_1    IP_2   62744     162  17016262744      3  82200 secs       3           1
 4: 2017-05-02 05:51:16    IP_1    IP_2   50794    8080 182808050794      5    Inf secs       1           2
 5: 2017-05-02 05:51:16    IP_2    IP_1    8080   50794 182808050794      5      0 secs       1           2
 6: 2017-05-08 05:57:08    IP_1    IP_2   50794    8080 182808050794      5 518752 secs       2           1
 7: 2017-05-11 05:32:49    IP_1    IP_2   50794    8080 182808050794      5 257741 secs       3           2
 8: 2017-05-11 05:32:49    IP_2    IP_1    8080   50794 182808050794      5      0 secs       3           2
 9: 2017-05-04 05:52:05    IP_1    IP_2   51896    8080 182808051896      5    Inf secs       1           2
10: 2017-05-04 05:52:05    IP_2    IP_1    8080   51896 182808051896      5      0 secs       1           2
11: 2017-05-04 09:22:26    IP_1    IP_2   51896    8080 182808051896      5  12621 secs       2           2
12: 2017-05-04 09:22:26    IP_2    IP_1    8080   51896 182808051896      5      0 secs       2           2
13: 2017-05-08 06:22:47    IP_1    IP_2   51896    8080 182808051896      5 334821 secs       3           1
14: 2017-05-12 13:58:32    IP_1    IP_2   54793    8080 182808054793      3    Inf secs       1           2
15: 2017-05-12 13:58:32    IP_2    IP_1    8080   54793 182808054793      3      0 secs       1           2
16: 2017-05-17 07:37:16    IP_1    IP_2   54793    8080 182808054793      3 409124 secs       2           1
17: 2017-05-11 07:12:28    IP_1    IP_2   54813    8080 182808054813      3    Inf secs       1           2
18: 2017-05-11 07:12:28    IP_2    IP_1    8080   54813 182808054813      3      0 secs       1           2
19: 2017-05-17 07:37:16    IP_1    IP_2   54813    8080 182808054813      3 519888 secs       2           1

现在,必须决定保留哪些行。在这个小样本中,只有具有两个成员 (pairs) 或 singles 的组。

DT[count.pairs > 1]

只显示

              cdatetime srcaddr dstaddr srcport dstport          key totals   time_diff pair.id count.pairs
 1: 2017-05-02 05:51:16    IP_1    IP_2   50794    8080 182808050794      5    Inf secs       1           2
 2: 2017-05-02 05:51:16    IP_2    IP_1    8080   50794 182808050794      5      0 secs       1           2
 3: 2017-05-11 05:32:49    IP_1    IP_2   50794    8080 182808050794      5 257741 secs       3           2
 4: 2017-05-11 05:32:49    IP_2    IP_1    8080   50794 182808050794      5      0 secs       3           2
 5: 2017-05-04 05:52:05    IP_1    IP_2   51896    8080 182808051896      5    Inf secs       1           2
 6: 2017-05-04 05:52:05    IP_2    IP_1    8080   51896 182808051896      5      0 secs       1           2
 7: 2017-05-04 09:22:26    IP_1    IP_2   51896    8080 182808051896      5  12621 secs       2           2
 8: 2017-05-04 09:22:26    IP_2    IP_1    8080   51896 182808051896      5      0 secs       2           2
 9: 2017-05-12 13:58:32    IP_1    IP_2   54793    8080 182808054793      3    Inf secs       1           2
10: 2017-05-12 13:58:32    IP_2    IP_1    8080   54793 182808054793      3      0 secs       1           2
11: 2017-05-11 07:12:28    IP_1    IP_2   54813    8080 182808054813      3    Inf secs       1           2
12: 2017-05-11 07:12:28    IP_2    IP_1    8080   54813 182808054813      3      0 secs       1           2

同时

DT[count.pairs <= 1]

显示要删除的单曲:

             cdatetime srcaddr dstaddr srcport dstport          key totals   time_diff pair.id count.pairs
1: 2017-05-15 04:56:00    IP_1    IP_2   62744     162  17016262744      3    Inf secs       1           1
2: 2017-05-17 09:41:00    IP_1    IP_2   62744     162  17016262744      3 189900 secs       2           1
3: 2017-05-18 08:31:00    IP_1    IP_2   62744     162  17016262744      3  82200 secs       3           1
4: 2017-05-08 05:57:08    IP_1    IP_2   50794    8080 182808050794      5 518752 secs       2           1
5: 2017-05-08 06:22:47    IP_1    IP_2   51896    8080 182808051896      5 334821 secs       3           1
6: 2017-05-17 07:37:16    IP_1    IP_2   54793    8080 182808054793      3 409124 secs       2           1
7: 2017-05-17 07:37:16    IP_1    IP_2   54813    8080 182808054813      3 519888 secs       2           1

警告 生产数据中可能会发生在 30 秒时间跨度内具有相同键的两个以上事件。有一些选项可以解决这个问题:

  • 只保留精确配对:DT[count.pairs == 2] 忽略所有其他情况。
  • 只保留偶数个合作伙伴的“对”:DT[count.pairs %% 2L == 0L](尽管这可能会保留入站连接数不等于出站连接数的情况)
  • 以及其他需要额外工作的选项,例如,分别计算入站和出站连接。
  • 最后,可以缩短 30 秒的时间窗口。

统计数据可以使用

创建
DT[, .N, count.pairs]
#   count.pairs  N
#1:           1  7
#2:           2 12

在生产数据集上看到这一点会很有趣。

【讨论】:

  • 谢谢@UweBlock。有趣的一点。我明天试试,我们再谈谈。
【解决方案2】:

显然这是可行的:

DT[, keep := 
  (!is.na(time_diff) & time_diff < 30) | 
  shift(time_diff, type="lead", fill = 999) < 30
, by=key]

DT[(keep), !"keep"]

              cdatetime srcaddr dstaddr srcport dstport          key totals time_diff
 1: 2017-05-12 07:58:32    IP_1    IP_2   54793    8080 182808054793      3        NA
 2: 2017-05-12 07:58:32    IP_2    IP_1    8080   54793 182808054793      3         0
 3: 2017-05-11 01:12:28    IP_1    IP_2   54813    8080 182808054813      3        NA
 4: 2017-05-11 01:12:28    IP_2    IP_1    8080   54813 182808054813      3         0
 5: 2017-05-01 23:51:16    IP_1    IP_2   50794    8080 182808050794      5        NA
 6: 2017-05-01 23:51:16    IP_2    IP_1    8080   50794 182808050794      5         0
 7: 2017-05-10 23:32:49    IP_1    IP_2   50794    8080 182808050794      5    257741
 8: 2017-05-10 23:32:49    IP_2    IP_1    8080   50794 182808050794      5         0
 9: 2017-05-03 23:52:05    IP_1    IP_2   51896    8080 182808051896      5        NA
10: 2017-05-03 23:52:05    IP_2    IP_1    8080   51896 182808051896      5         0
11: 2017-05-04 03:22:26    IP_1    IP_2   51896    8080 182808051896      5     12621
12: 2017-05-04 03:22:26    IP_2    IP_1    8080   51896 182808051896      5         0

我想有很多方法可以让这更快,但澄清它是否在做 OP 想要的可能更重要(考虑到代码比 OP 的规则简单得多)。

【讨论】:

  • 谢谢@Frank。简单确实让它有点可疑,但它完全掌握了这里的要求。我已经从工作中的原始 df 中保存了几个子集以进行测试,但直到明天早上我才能做到这一点。
猜你喜欢
  • 2013-12-08
  • 1970-01-01
  • 2015-05-01
  • 1970-01-01
  • 2018-09-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多