【问题标题】:Concurrent Set Queue并发集合队列
【发布时间】:2011-03-08 10:08:52
【问题描述】:

也许这是一个愚蠢的问题,但我似乎找不到一个明显的答案。

我需要一个仅包含唯一值的并发 FIFO 队列。尝试添加队列中已经存在的值只会忽略该值。如果不是为了线程安全,那将是微不足道的。是否有 Java 中的数据结构或互联网上的代码片段表现出这种行为?

【问题讨论】:

  • 不幸的是,“队列”这个词是模棱两可的,对于一些读者来说它隐含的意思是“先进先出队列”,而对其他人来说它具有更一般的java.util.Queue 含义,这基本上意味着任何具有 一些“头元素”的概念,无论该元素是否是先进的。所以!是哪个?
  • 先进先出,抱歉遗漏=)

标签: java collections concurrency queue set


【解决方案1】:

如果您想要比完全同步更好的并发性,我知道有一种方法可以做到这一点,即使用 ConcurrentHashMap 作为支持映射。以下仅为草图。

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

采用这种方法,一切都不是阳光。除了使用支持地图的entrySet().iterator().next() 之外,我们没有合适的方法来选择头部元素,结果是随着时间的推移地图变得越来越不平衡。由于更大的桶冲突和更大的段争用,这种不平衡是一个问题。

注意:这段代码在一些地方使用了Guava

【讨论】:

  • 这如何保持Queue 的顺序?迭代顺序取决于支持映射的实现。
  • 保留什么顺序?我没有看到关于订单的任何要求,除非假设他想要一个 FIFO 队列......我添加了一条评论来询问。
  • @NitsanWakart 我要回答的问题是如何获得Queue
【解决方案2】:

没有一个内置的集合可以做到这一点。有一些并发的Set 实现可以与并发的Queue 一起使用。

例如,一个项目只有在成功添加到集合后才会被添加到队列中,并且从队列中删除的每个项目都会从集合中删除。在这种情况下,从逻辑上讲,队列的内容实际上是集合中的任何内容,并且队列仅用于跟踪订单并提供仅在 BlockingQueue 上发现的高效 take()poll() 操作。

【讨论】:

  • 我的一个实现使用了 LinkedHashSet,因此我只有一个数据结构并且可以依赖于顺序。但是,ConcurrentQueue 背后的算法比使用同步锁要复杂得多,我想知道是否存在具有额外唯一性约束的高性能集合。
  • @Ambience - 没有这样的集合。我的技术允许您使用并发的Queue(如果您需要take(),则可以使用LinkedBlockingQueue,如果您只需要poll(),则可以使用ConcurrentLinkedQueue),保留FIFO 排序,同时添加Set 类似的唯一性。跨度>
【解决方案3】:

在有足够的理由考虑替代方案之前,我会使用同步的 LinkedHashSet。更多并发解决方案可以提供的主要好处是锁拆分。

最简单的并发方法是 ConcurrentHashMap(作为集合)和 ConcurrentLinkedQueue。操作的顺序将提供所需的约束。 offer() 将首先执行 CHM#putIfAbsent(),如果成功插入 CLQ。 poll() 将从 CLQ 中获取,然后将其从 CHM 中删除。这意味着如果它在映射中并且 CLQ 提供排序,我们将考虑在我们的队列中的条目。然后可以通过增加映射的并发级别来调整性能。如果您能够容忍额外的 racy-ness,那么廉价的 CHM#get() 可以作为一个合理的前提条件(但它可能会因略显陈旧的观点而受到影响)。

【讨论】:

    【解决方案4】:

    java.util.concurrent.ConcurrentLinkedQueue 可以帮助您顺利到达目的地。

    用您自己的类包装 ConcurrentLinkedQueue 以检查添加的唯一性。您的代码必须是线程安全的。

    【讨论】:

    • 一旦包裹,它可能不再是线程安全的,即使它基于ConcurrentLinkedQueue
    • 我的第一遍实现几乎就是这样做的,但我担心在链表支持的队列上调用 .contains() 的成本以及同步队列方法完全否定了底层的好处ConcurrentLinkdQueue 算法。
    • @Ambience:基于 ConcurrentLinkedQueue 的 Javadoc,我猜 contains() 方法的调用时间受 N 限制。您应该在 ConcurrentSetQueue 中同步的唯一方法是添加方法。其他方法在 ConcurrentLinkedQueue 中同步。
    【解决方案5】:

    你所说的具有 Set 语义的并发队列是什么意思?如果您的意思是真正的并发结构(而不​​是线程安全结构),那么我认为您要求的是小马。

    例如,如果您调用put(element) 并检测到某些东西已经存在并立即被删除,会发生什么情况?例如,如果offer(element) || queue.contains(element) 返回false,对您来说意味着什么?

    在并发世界中,这类事情通常需要稍微不同地思考,因为除非你停止世界(锁定它),否则通常什么都不像看起来那样。否则,您通常会看到过去的事情。那么,你到底想做什么呢?

    【讨论】:

    • 有趣的观察,如果不是答案 =) 你是否低于评论点阈值?
    【解决方案6】:

    也许扩展ArrayBlockingQueue。为了访问(包访问)锁,我不得不将我的子类放在同一个包中。警告:我没有对此进行测试。

    package java.util.concurrent;
    
    import java.util.Collection;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {
    
        public DeDupingBlockingQueue(int capacity) {
            super(capacity);
        }
    
        public DeDupingBlockingQueue(int capacity, boolean fair) {
            super(capacity, fair);
        }
    
        public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
            super(capacity, fair, c);
        }
    
        @Override
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (contains(e)) return false;
                return super.add(e);
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (contains(e)) return true;
                return super.offer(e);
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public void put(E e) throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly(); //Should this be lock.lock() instead?
            try {
                if (contains(e)) return;
                super.put(e); //if it blocks, it does so without holding the lock.
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (contains(e)) return true;
                return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
            } finally {
                lock.unlock();
            }
        }
    }
    

    【讨论】:

      【解决方案7】:

      唯一对象队列的简单答案如下:

      import java.util.concurrent.ConcurrentLinkedQueue;
      
      public class FinalQueue {
      
          class Bin {
              private int a;
              private int b;
      
              public Bin(int a, int b) {
                  this.a = a;
                  this.b = b;
              }
      
              @Override
              public int hashCode() {
                  return a * b;
              }
      
              public String toString() {
                  return a + ":" + b;
              }
      
              @Override
              public boolean equals(Object obj) {
                  if (this == obj)
                      return true;
                  if (obj == null)
                      return false;
                  if (getClass() != obj.getClass())
                      return false;
                  Bin other = (Bin) obj;
                  if ((a != other.a) || (b != other.b))
                      return false;
                  return true;
              }
          }
      
          private ConcurrentLinkedQueue<Bin> queue;
      
          public FinalQueue() {
              queue = new ConcurrentLinkedQueue<Bin>();
          }
      
          public synchronized void enqueue(Bin ipAddress) {
              if (!queue.contains(ipAddress))
                  queue.add(ipAddress);
          }
      
          public Bin dequeue() {
              return queue.poll();
          }
      
          public String toString() {
              return "" + queue;
          }
      
          /**
           * @param args
           */
          public static void main(String[] args) {
              FinalQueue queue = new FinalQueue();
              Bin a = queue.new Bin(2,6);
      
              queue.enqueue(a);
              queue.enqueue(queue.new Bin(13, 3));
              queue.enqueue(queue.new Bin(13, 3));
              queue.enqueue(queue.new Bin(14, 3));
              queue.enqueue(queue.new Bin(13, 9));
              queue.enqueue(queue.new Bin(18, 3));
              queue.enqueue(queue.new Bin(14, 7));
              Bin x= queue.dequeue();
              System.out.println(x.a);
              System.out.println(queue.toString());
              System.out.println("Dequeue..." + queue.dequeue());
              System.out.println("Dequeue..." + queue.dequeue());
              System.out.println(queue.toString());
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2011-04-18
        • 2017-06-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多