【问题标题】:Iterate over ConcurrentHashMap while deleting entries在删除条目时迭代 ConcurrentHashMap
【发布时间】:2016-05-10 00:04:19
【问题描述】:

我想在删除条目时定期迭代 ConcurrentHashMap,如下所示:

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    // do something
    iter.remove();
}

问题是在我迭代时另一个线程可能正在更新或修改值。如果发生这种情况,这些更新可能会永远丢失,因为我的线程在迭代时只会看到陈旧的值,但 remove() 将删除实时条目。

经过一番考虑,我想出了这个解决方法:

map.forEach((key, value) -> {
    // delete if value is up to date, otherwise leave for next round
    if (map.remove(key, value)) {
        // do something
    }
});

这样做的一个问题是它不会捕获对未实现equals()(例如AtomicInteger)的可变值的修改。有没有更好的方法来安全地删除并发修改?

【问题讨论】:

  • 为什么不在做任何工作之前删除条目。
  • @ClaudioCorsi 这不会改变我看到已删除条目的陈旧版本的事实。
  • 问题是您需要能够知道自从您开始遍历地图以来更新了哪些内容。即使您可以知道哪些对象已被更新。另一个线程仍然有可能引用了一个已处理但该对象尚未更新的对象。该对象将被添加回来还是只是被更新?这个对象是否应该产生另一个回调?
  • 条目是短命的还是长命的?如果它们是短暂的,那么您可以考虑使用弱引用作为值,然后只处理引用队列。
  • @ClaudioCorsi 我说的是地图可见的更新,例如put()merge()compute()

标签: java multithreading concurrenthashmap


【解决方案1】:

您的解决方法可行,但存在一种可能的情况。如果某些条目不断更新 map.remove(key,value) 可能永远不会返回 true,直到更新结束。

如果你使用JDK8这里是我的解决方案

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    Map.compute(entry.getKey(), (k, v) -> f(v));
    //do something for prevValue
}
....
private Integer prevValue;

private Integer f(Integer v){
    prevValue = v;
    return null;
}

compute() 将 f(v) 应用于值,在我们的例子中,将值分配给全局变量并删除条目。

根据 Javadoc,它是原子的。

尝试计算指定键及其当前映射值的映射(如果没有当前映射,则为 null)。整个方法调用以原子方式执行。在计算过程中,其他线程对该地图的一些尝试更新操作可能会被阻止,因此计算应该简短而简单,并且不得尝试更新该地图的任何其他映射。

【讨论】:

  • compute() 将在 f() 返回 null 时返回 null。
  • 此外,使用您的方法迭代键会更直接。
【解决方案2】:

您的解决方法实际上非常好。还有其他设施,您可以在这些设施之上构建一些类似的解决方案(例如,使用 computeIfPresent() 和墓碑值),但它们有自己的注意事项,我在略有不同的用例中使用了它们。

至于使用不为映射值实现equals() 的类型,您可以在相应类型之上使用自己的包装器。这是将对象相等的自定义语义注入ConcurrentMap 提供的原子替换/删除操作的最直接方法。

更新

这是一个草图,展示了如何在 ConcurrentMap.remove(Object key, Object value) API 之上构建:

  • 在您用于值的可变类型之上定义一个包装器类型,同时在当前可变值之上定义您的自定义 equals() 方法。
  • 在您的 BiConsumer(您传递给 forEach 的 lambda)中,创建该值的深层副本(属于您的新包装器类型)并执行您的逻辑来确定是否需要删除该值在副本上。
  • 如果需要删除该值,请致电remove(myKey, myValueCopy)
    • 如果在计算是否需要删除该值时发生了一些并发更改,remove(myKey, myValueCopy) 将返回 false(不包括 ABA 问题,这是一个单独的主题)。

这里有一些代码说明了这一点:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Playground {

   private static class AtomicIntegerWrapper {
      private final AtomicInteger value;

      AtomicIntegerWrapper(int value) {
         this.value = new AtomicInteger(value);
      }

      public void set(int value) {
         this.value.set(value);
      }

      public int get() {
         return this.value.get();
      }

      @Override
      public boolean equals(Object obj) {
         if (this == obj) {
            return true;
         }
         if (!(obj instanceof AtomicIntegerWrapper)) {
            return false;
         }
         AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
         if (other.value.get() == this.value.get()) {
            return true;
         }
         return false;
      }

      public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
         int wrapped = wrapper.get();
         return new AtomicIntegerWrapper(wrapped);
      }
   }

   private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
         = new ConcurrentHashMap<>();

   private static final int NUM_THREADS = 3;

   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < 10; ++i) {
         MAP.put(i, new AtomicIntegerWrapper(1));
      }

      Thread.sleep(1);

      for (int i = 0; i < NUM_THREADS; ++i) {
         new Thread(() -> {
            Random rnd = new Random();
            while (!MAP.isEmpty()) {
               MAP.forEach((key, value) -> {
                  AtomicIntegerWrapper elem = MAP.get(key);
                  if (elem == null) {
                     System.out.println("Oops...");
                  } else if (elem.get() == 1986) {
                     elem.set(1);
                  } else if ((rnd.nextInt() & 128) == 0) {
                     elem.set(1986);
                  }
               });
            }
         }).start();
      }

      Thread.sleep(1);

      new Thread(() -> {
         Random rnd = new Random();
         while (!MAP.isEmpty()) {
            MAP.forEach((key, value) -> {
               AtomicIntegerWrapper elem =
                     AtomicIntegerWrapper.deepCopy(MAP.get(key));
               if (elem.get() == 1986) {
                  try {
                     Thread.sleep(10);
                  } catch (Exception e) {}
                  boolean replaced = MAP.remove(key, elem);
                  if (!replaced) {
                     System.out.println("Bailed out!");
                  } else {
                     System.out.println("Replaced!");
                  }
               }
            });
         }
      }).start();
   }
}

您会看到“Bailed out!”的打印输出,与“Replaced!”混合在一起。 (删除成功,因为没有您关心的并发更新)并且计算将在某个时候停止。

  • 如果您移除自定义 equals() 方法并继续使用副本,您将看到源源不断的“Bailed out!”,因为副本永远不会被视为等于地图中的值。
  • 如果您不使用副本,您将不会看到“Bailed out!”打印出来,你就会遇到你正在解释的问题 - 无论并发更改如何,值都会被删除。

【讨论】:

  • 其实,我觉得我的equals() 点有点无关紧要。只要值被改变而不是被替换,就会出现问题,因为remove() 看到的是相同的引用。如果我们愿意为每次更新创建一个新包装器,我想包装器会起作用。
  • 除非我弄错了,否则我认为这种方法适用于变异的值。我将更新我的答案以添加说明该方法的代码示例。
  • 您的解决方案有效,但我的想法对我的要求来说更简单一些。我不需要在删除之前验证当前值;只要我能看到最新版本,我就想删除 any 值。所以我需要做的就是在更新时创建一个浅包装副本以避免引用相等,并依靠remove() 的原子性保证和更新操作(通过compute()merge() 等)来确保值可以'成功移除后不会发生变异。
  • 我的方法的另一个好处是包装器不需要实现equals(),它可以是完全通用的,因为它不需要特定于类型的逻辑。就我的项目而言,重要的是这些值可以是通用的。
  • 第二次(第五次?)认为,我认为可变值根本不是问题。事实上,我们甚至可以将Iterator.remove() 用于可变值,因为它们不会过时,并且映射的原子性保证确保在删除后不会有任何修改。唯一开始的问题是不可变的值,它可以被替换,从而变得陈旧。该问题的解决方案是我调用map.remove(Object, Object) 以确保删除的值与我们看到的相同。
【解决方案3】:

让我们考虑一下您有哪些选择。

  1. 使用isUpdated() 操作创建您自己的容器类并使用您自己的解决方法。

  2. 如果您的地图只包含几个元素,并且与放置/删除操作相比,您对地图的迭代非常频繁。使用CopyOnWriteArrayList 可能是一个不错的选择 CopyOnWriteArrayList&lt;Entry&lt;Integer, Integer&gt;&gt; lookupArray = ...;

  3. 另一种选择是实现自己的CopyOnWriteMap

    public class CopyOnWriteMap<K, V> implements Map<K, V>{
    
        private volatile Map<K, V> currentMap;
    
        public V put(K key, V value) {
            synchronized (this) {
                Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
                V val = newOne.put(key, value);
                this.currentMap = newOne; // atomic operation
                return val;
            }
        }
    
        public V remove(Object key) {
            synchronized (this) {
                Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
                V val = newOne.remove(key);
                this.currentMap = newOne; // atomic operation
                return val;
            }
        }
    
        [...]
    }
    

有负面影响。如果您使用的是写时复制集合,您的更新将永远不会丢失,但您可以再次看到一些以前删除的条目。

最坏的情况:如果地图被复制,每次删除的条目都会被恢复。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-10
    • 2020-08-09
    • 2020-08-22
    相关资源
    最近更新 更多