【问题标题】:is java AtomicReference thread safe when used within parallelStream?在parallelStream中使用时java AtomicReference线程安全吗?
【发布时间】:2016-08-31 12:21:22
【问题描述】:

我使用parallelStream来获取数组中最长的字符串,代码如下,每次运行时,我都会得到不同的结果。 AtomicReference 即使在 parallelStream 中使用也应该是线程安全的?但是为什么会这样呢?

public static void main(String[] args) {
  AtomicReference<String> longest = new AtomicReference<>();
  LongAccumulator accumulator = new LongAccumulator(Math::max, 0);
  List<String> words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform");
  words.parallelStream().forEach(
          next -> longest.updateAndGet(
                     current -> {
                        String result = next.length() > accumulator.intValue() ? next : current;
                        accumulator.accumulate(next.length());
                        return result;
                     }
                  )
  );
  System.out.println(longest.get());
}

有一次,我会打印“祝贺”,有时会打印“平台”。

【问题讨论】:

  • 可以改用words.parallelStream().max(Comparator.comparingInt(String::length));吗?我试过了,它总是返回相同的词。但是我不能 100% 确定它是否可以并行执行。
  • @Clayn:这就是并发的问题——你永远无法确定。但我可以断言,您的变体是正确的。
  • @Holger 感谢您的断言。我首先想到的是:我确信使用 Stream API 可以做得更优雅

标签: java java-8


【解决方案1】:

您正在调用LongAccumulator.intValue(),记录为:

在缩小原语转换后将 current value 作为 int 返回。

并按照我们将学习的get() 方法的链接:

返回当前值。返回值是不是原子快照;在没有并发更新的情况下调用会返回准确的结果,但在计算值时发生的并发更新可能不会被合并。

所以虽然AtomicReference.updateAndGet 操作是线程安全的,但您对LongAccumulator.intValue()LongAccumulator.accumulate 的并发调用却不是。 LongAccumulator 用于执行并发 accumulate 操作,然后在所有累加操作完成后获取结果。请注意,即使 get() 正在返回正确的快照,intValue() 的调用和随后的 accumulate() 的调用是两个不同的因此非原子的操作,这使得该操作仍然容易发生数据竞争。

在大多数情况下,如果您发现自己试图操作 forEach 中的数据结构,则说明您使用了错误的工具来完成这项工作,从而使代码变得不必要地复杂且容易出错。 作为克莱恩hinted in a comment,一个 words.parallelStream().max(Comparator.comparingInt(String::l‌​ength)) 会简洁准确地完成工作。

【讨论】:

    【解决方案2】:

    实际上我写的是@Holger 已经提到的问题,我有点晚了,但无论如何我写这篇文章是为了证明@Holger's answer。你可以使用AtomicReference的累加器;

    代码如下:

    public static void main(String[] args) {
        AtomicReference < String > longest = new AtomicReference < > ();
        List < String > words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform");
    
        words.parallelStream().forEach(next - > {
            longest.accumulateAndGet(next, (a, b) - >
                a != null && a.length() > b.length() ? a : b
            );
        });
    
        System.out.println(longest.get());
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-08-09
      • 1970-01-01
      • 2011-01-23
      • 1970-01-01
      • 2011-04-27
      • 1970-01-01
      • 2016-08-24
      相关资源
      最近更新 更多