【问题标题】:ConcurrentHashMap: avoid extra object creation with "putIfAbsent"?ConcurrentHashMap:避免使用“putIfAbsent”创建额外的对象?
【发布时间】:2012-05-31 09:34:40
【问题描述】:

我正在为多线程环境中的键聚合多个值。密钥是事先不知道的。我以为我会做这样的事情:

class Aggregator {
    protected ConcurrentHashMap<String, List<String>> entries =
                            new ConcurrentHashMap<String, List<String>>();
    public Aggregator() {}

    public void record(String key, String value) {
        List<String> newList =
                    Collections.synchronizedList(new ArrayList<String>());
        List<String> existingList = entries.putIfAbsent(key, newList);
        List<String> values = existingList == null ? newList : existingList;
        values.add(value);
    }
}

我看到的问题是,每次运行此方法时,我都需要创建一个 ArrayList 的新实例,然后将其丢弃(在大多数情况下)。这似乎是对垃圾收集器的不合理滥用。有没有更好的、线程安全的方法来初始化这种结构而不必synchronizerecord 方法?我对让 putIfAbsent 方法不返回新创建的元素的决定感到有些惊讶,而且除非需要(可以这么说),否则缺乏延迟实例化的方法。

【问题讨论】:

  • 除非有基准,否则不要担心额外的对象。短期对象分配/GC 在现代 JVM 中很便宜。 (我猜“some”不仅仅是“none”,但现代 JVM 通常对此没有任何问题。)无论如何,这仍然是一个有趣的问题,因为它有望产生一些有趣的方法。 “延迟”分配等在 Java 中有点尴尬,因此并不常见,因为缺少闭包或“按名称传递”语义。 (匿名类并不那么性感,通常需要有一个相应的接口以及putIf.. 的重载)。
  • 两年多过去了,仍然是一个很好的问题。我真的希望 Java 1.8 为默认对象的延迟实例化添加了类似 putIfAbsent (K key, Supplier&lt;V&gt; value) 的内容。它肯定已经在ConcurrentMap 接口上改进了对流 API 的其他支持。
  • @sparc_spread Java 1.8 向 MapConcurrentMap 添加了一个 computeIfAbsent(K key, Function&lt;K, V&gt; mappingFunction) 方法,该方法应该可以满足您对默认对象的延迟实例化的要求。

标签: java synchronization thread-safety concurrenthashmap


【解决方案1】:

Java 8 引入了一个 API 来解决这个确切的问题,提供了一个单行解决方案:

public void record(String key, String value) {
    entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())).add(value);
}

对于 Java 7:

public void record(String key, String value) {
    List<String> values = entries.get(key);
    if (values == null) {
        entries.putIfAbsent(key, Collections.synchronizedList(new ArrayList<String>()));
        // At this point, there will definitely be a list for the key.
        // We don't know or care which thread's new object is in there, so:
        values = entries.get(key);
    }
    values.add(value);
}

这是填充 ConcurrentHashMap 时的标准代码模式。

特殊方法putIfAbsent(K, V)) 将把你的值对象放入,或者如果另一个线程在你之前,那么它将忽略你的值对象。无论哪种方式,在调用putIfAbsent(K, V)) 之后,get(key) 保证在线程之间是一致的,因此上面的代码是线程安全的。

唯一浪费的开销是,如果某个其他线程同时为同一个键添加了一个新条目:您可能最终会丢弃新创建的值,但这只有在有还不是一个条目并且你的线程输掉了一场比赛,这通常很少见。

【讨论】:

  • 我想念有一个 Java 相当于 Python 的 defaultdict... 还有其他人吗?
  • 所以putIfAbsent 调用将values 变量替换为null;此代码需要测试以查看 putIfAbsent 是否返回 null。
  • @Bohemian 你是什么意思它不是场景的一部分?您的意思是您必须保证这种情况永远不会发生,因为如果发生了,您的代码将无法安全地处理它?
  • 我知道这是一个旧答案,但@Erlend 是正确的。另一个线程可能会删除对putIfAbsentget 的调用之间的条目。 Gene 的答案是正确的实现,保证您不会在调用 values.add() 时抛出 NPE。
【解决方案2】:

从 Java-8 开始,您可以使用以下模式创建多地图:

public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())) .add(value); }

ConcurrentHashMap 文档(不是通用合同)指定 ArrayList 只会为每个键创建一次,在为新键创建 ArrayList 时会延迟更新,这会产生轻微的初始成本:

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-

【讨论】:

  • 我经常这样做,为此我写了一个模块:github.com/ansell/JDefaultDict
  • 此外,如果您可以使用 Collection 而不是 List,最好使用 ConcurrentLinkedQueue 来实现真正的并发解决方案。
  • 是的,但它比此处其他答案中提出的 putIfAbsent 方法慢得多。根据设置,它从中等竞争的 2 次到高竞争的 50 次较慢。
  • 我已经完成了基准测试:stackoverflow.com/questions/44969543/…
【解决方案3】:

最后,我对@Bohemian 的回答进行了轻微修改。他提出的解决方案用putIfAbsent 调用覆盖了values 变量,这产生了我之前遇到的同样问题。似乎工作的代码如下所示:

    public void record(String key, String value) {
        List<String> values = entries.get(key);
        if (values == null) {
            values = Collections.synchronizedList(new ArrayList<String>());
            List<String> values2 = entries.putIfAbsent(key, values);
            if (values2 != null)
                values = values2;
        }
        values.add(value);
    }

它没有我想要的那么优雅,但它比在每次调用时创建一个新的ArrayList 实例的原始版本要好。

【讨论】:

  • 从 Java-8 开始,您可以将其替换为: entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList())).add(value)
  • @Peter 请发表您的评论作为答案,因为使用 lambadas 是最优雅、最清晰的方式。
  • 如果密钥不存在,原始答案只会创建一个 ArrayList 实例,所以这对我来说似乎并不算太​​糟糕。
  • @Peter 这是执行时间最快的方法,在竞争激烈的环境中,它比“lambda”方法快 2 到 50 倍。
【解决方案4】:

根据 Gene 的回答创建了两个版本

public  static <K,V> void putIfAbsetMultiValue(ConcurrentHashMap<K,List<V>> entries, K key, V value) {
    List<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedList(new ArrayList<V>());
        List<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

public  static <K,V> void putIfAbsetMultiValueSet(ConcurrentMap<K,Set<V>> entries, K key, V value) {
    Set<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedSet(new HashSet<V>());
        Set<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

效果很好

【讨论】:

    【解决方案5】:

    这是一个我也在寻找答案的问题。 putIfAbsent 方法实际上并没有解决额外的对象创建问题,它只是确保这些对象中的一个不会替换另一个。但是线程之间的竞争条件会导致多个对象实例化。我可以为这个问题找到 3 个解决方案(我会按照这个优先顺序):

    1- 如果您使用的是 Java 8,实现此目的的最佳方法可能是 ConcurrentMap 的新 computeIfAbsent 方法。您只需要给它一个将同步执行的计算函数(至少对于ConcurrentHashMap 实现)。示例:

    private final ConcurrentMap<String, List<String>> entries =
            new ConcurrentHashMap<String, List<String>>();
    
    public void method1(String key, String value) {
        entries.computeIfAbsent(key, s -> new ArrayList<String>())
                .add(value);
    }
    

    这是来自ConcurrentHashMap.computeIfAbsent的javadoc:

    如果指定的键尚未与值关联,则尝试 使用给定的映射函数计算其值并输入 进入这张地图,除非为空。执行整个方法调用 原子地,因此每个键最多应用该函数一次。一些 其他线程在此地图上尝试的更新操作可能是 在计算过程中被阻塞,所以计算应该是 简短而简单,并且不得尝试更新任何其他映射 这张地图。

    2- 如果你不能使用 Java 8,你可以使用GuavaLoadingCache,它是线程安全的。您为它定义了一个加载函数(就像上面的compute 函数一样),您可以确定它会被同步调用。示例:

    private final LoadingCache<String, List<String>> entries = CacheBuilder.newBuilder()
            .build(new CacheLoader<String, List<String>>() {
                @Override
                public List<String> load(String s) throws Exception {
                    return new ArrayList<String>();
                }
            });
    
    public void method2(String key, String value) {
        entries.getUnchecked(key).add(value);
    }
    

    3- 如果您也不能使用 Guava,您可以随时手动同步并进行双重检查锁定。示例:

    private final ConcurrentMap<String, List<String>> entries =
            new ConcurrentHashMap<String, List<String>>();
    
    public void method3(String key, String value) {
        List<String> existing = entries.get(key);
        if (existing != null) {
            existing.add(value);
        } else {
            synchronized (entries) {
                List<String> existingSynchronized = entries.get(key);
                if (existingSynchronized != null) {
                    existingSynchronized.add(value);
                } else {
                    List<String> newList = new ArrayList<>();
                    newList.add(value);
                    entries.put(key, newList);
                }
            }
        }
    }
    

    我对所有这 3 个方法以及非同步方法做了一个示例实现,这会导致额外的对象创建:http://pastebin.com/qZ4DUjTr

    【讨论】:

      【解决方案6】:

      Java 1.7.40 处理空数组列表创建问题的内存浪费(还有 GC 等)。不要担心创建空数组列表。 参考:http://javarevisited.blogspot.com.tr/2014/07/java-optimization-empty-arraylist-and-Hashmap-cost-less-memory-jdk-17040-update.html

      【讨论】:

        【解决方案7】:

        putIfAbsent 的方法具有最快的执行时间,在竞争激烈的环境中,它比“lambda”方法快 2 到 50 倍。 Lambda 不是这种“powerloss”背后的原因,问题是在 Java-9 优化之前computeIfAbsent 内部的强制同步。

        基准:

        import java.util.Random;
        import java.util.concurrent.ConcurrentHashMap;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicInteger;
        import java.util.concurrent.atomic.AtomicLong;
        
        public class ConcurrentHashMapTest {
            private final static int numberOfRuns = 1000000;
            private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
            private final static int keysSize = 10;
            private final static String[] strings = new String[keysSize];
            static {
                for (int n = 0; n < keysSize; n++) {
                    strings[n] = "" + (char) ('A' + n);
                }
            }
        
            public static void main(String[] args) throws InterruptedException {
                for (int n = 0; n < 20; n++) {
                    testPutIfAbsent();
                    testComputeIfAbsentLamda();
                }
            }
        
            private static void testPutIfAbsent() throws InterruptedException {
                final AtomicLong totalTime = new AtomicLong();
                final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
                final Random random = new Random();
                ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        
                for (int i = 0; i < numberOfThreads; i++) {
                    executorService.execute(new Runnable() {
                        @Override
                        public void run() {
                            long start, end;
                            for (int n = 0; n < numberOfRuns; n++) {
                                String s = strings[random.nextInt(strings.length)];
                                start = System.nanoTime();
        
                                AtomicInteger count = map.get(s);
                                if (count == null) {
                                    count = new AtomicInteger(0);
                                    AtomicInteger prevCount = map.putIfAbsent(s, count);
                                    if (prevCount != null) {
                                        count = prevCount;
                                    }
                                }
                                count.incrementAndGet();
                                end = System.nanoTime();
                                totalTime.addAndGet(end - start);
                            }
                        }
                    });
                }
                executorService.shutdown();
                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                        + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
            }
        
            private static void testComputeIfAbsentLamda() throws InterruptedException {
                final AtomicLong totalTime = new AtomicLong();
                final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
                final Random random = new Random();
                ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
                for (int i = 0; i < numberOfThreads; i++) {
                    executorService.execute(new Runnable() {
                        @Override
                        public void run() {
                            long start, end;
                            for (int n = 0; n < numberOfRuns; n++) {
                                String s = strings[random.nextInt(strings.length)];
                                start = System.nanoTime();
        
                                AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
                                count.incrementAndGet();
        
                                end = System.nanoTime();
                                totalTime.addAndGet(end - start);
                            }
                        }
                    });
                }
                executorService.shutdown();
                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                        + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
            }
        
        }
        

        结果:

        Test testPutIfAbsent average time per run: 115.756501 ns
        Test testComputeIfAbsentLamda average time per run: 276.9667055 ns
        Test testPutIfAbsent average time per run: 134.2332435 ns
        Test testComputeIfAbsentLamda average time per run: 223.222063625 ns
        Test testPutIfAbsent average time per run: 119.968893625 ns
        Test testComputeIfAbsentLamda average time per run: 216.707419875 ns
        Test testPutIfAbsent average time per run: 116.173902375 ns
        Test testComputeIfAbsentLamda average time per run: 215.632467375 ns
        Test testPutIfAbsent average time per run: 112.21422775 ns
        Test testComputeIfAbsentLamda average time per run: 210.29563725 ns
        Test testPutIfAbsent average time per run: 120.50643475 ns
        Test testComputeIfAbsentLamda average time per run: 200.79536475 ns
        

        【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-12-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-03-30
        • 1970-01-01
        • 1970-01-01
        • 2017-11-07
        相关资源
        最近更新 更多