【问题标题】:How to make use of sparse matrix to optimise evaluation on many shifted points如何利用稀疏矩阵优化对多个移位点的评估
【发布时间】:2025-11-21 16:45:03
【问题描述】:

我目前正在尝试有效地解决以下问题:

我有两个向量 v1v2 以及一个向量化函数 f。对于v1 中的每个x,我想计算f(x - v2) 的平均值。这个问题的特别之处在于f 将在许多输入上返回零。

示例:

set.seed(0)

v1 <- rnorm(1000)
v2 <- rnorm(1000)

f <- function(x) {
  ret <- double(length(x)) + 1
  ret[abs(x) > 0.01] <- 0
  ret
}

solution_01 <- function(v1, v2, f) {
  ret <- numeric(length(v1))
  for (x in v2) {
    ret <- ret + f(v1 - x)
  }
  ret/length(v2)
}

solution_02 <- function(v1, v2, f) {
  apply(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)), 1, sum)/length(v2)
}

solution_03 <- function(v1, v2, f) {
  rowSums(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))/length(v2)
}

solution_04 <- function(v1, v2, f) {
  rowMeans(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))
}

s1 <- solution_01(v1, v2, f)
s2 <- solution_02(v1, v2, f)
s3 <- solution_03(v1, v2, f)
s4 <- solution_04(v1, v2, f)

all.equal(s1, s2)
all.equal(s2, s3)
all.equal(s3, s4)

bench::mark(solution_01(v1, v2, f), solution_02(v1, v2, f), solution_03(v1, v2, f), solution_04(v1, v2, f))

# Sparsity
eval_points <- outer(v1, v2, `-`)
sum(f(eval_points) == 0)/length(eval_points)

如您所见,我已经实施了四种可能的解决方案。目前,naive 解决方案(使用 for 循环)是最快的。我认为这是因为其他实现依赖于outer,这需要一些时间来分配所需的内存。

如何优化此代码?有没有办法利用f(outer(v1, v_2)) 的稀疏性?

【问题讨论】:

  • 我认为您主要对速度感兴趣。我认为使用稀疏矩阵来存储结果在内存方面最有用,而不是速度。如果您需要更快的速度,在这种情况下我会选择 Rcpp。
  • @F.Privé 在这种情况下 Rcpp 可以做什么?我会在 C++ 中编写这个确切的循环吗?我熟悉基本的 C++,但不能做太多优化。会有多大影响?
  • @F.Privé 我尝试使用 Rcpp 并提出后续问题 here

标签: r performance optimization microbenchmark


【解决方案1】:

您提出的问题(我很欣赏这可能是您实际尝试做的事情的简化)将通过使用排序向量得到有效解决,尤其是当向量的元素长于 1000 个时。

由于您在 v1 中的值的一小段距离内计算 v2 中的值,因此您可以使用一种算法推进第一个向量,直到它超出第二个考虑的元素范围,然后切换到通过另一个向量前进。这样,您只需通过每个向量一次,而不是 length(v1) 次。

正如指出的那样,R 在这类事情上效率不高,如果你希望它真的很快,你应该在 Rcpp 中编码整个事情。

...

我以为我会尝试编写一个有趣的算法,结果发现即使用 R 编写,它在示例数据上的速度也快了近 10 倍!

solution_05 <- function(v1, v2, f) {
  vs1 <- sort(v1)
  vs2 <- sort(v2)
  n1 <- integer(length(v1))
  i1 <- 1
  i2 <- 1
  l <- length(v1)
  for (x2 in vs2) {
    # advance i1 until v1 is in range below
    while (x2-vs1[i1] > 0.01 & i1 <= l) i1 <- i1+1
    if (i2 > i1) n1[i1:(i2-1)] <- n1[i1:(i2-1)]+1 else i2 <- i1
    # advance i2 until out of range adding 1 to n1[i2] each time
    while (vs1[i2]-x2 <= 0.01 & i2 <= l) {
      n1[i2] <- n1[i2]+1
      i2 <- i2+1
    }
  }
  s5 <- n1[rank(v1)]/length(v2)
  return(s5)
}

【讨论】:

    最近更新 更多