【问题标题】:Java best practice for handling high amounts of stateful events处理大量有状态事件的 Java 最佳实践
【发布时间】:2014-09-04 16:40:12
【问题描述】:

我正在寻找实用程序类或最佳实践模式来处理我的应用程序中的大量传入有状态事件。

想象一个生产者产生许多事件,然后由作用于这些事件的应用程序使用。现在在某些情况下,生产者产生的事件比消费者实际处理的要多,但是由于所有事件都是有状态的,因此是否会错过某些事件并不重要,因为最新事件包含先前事件传达的所有信息。

我现在编写了以下 java 代码来处理这些情况,但我不确定这是否是正确的方法,以及是否没有更简单、更好、更安全的方法。

private static ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1);
private final static Object lock = new Object();
private static List<EventData> lastEventData = null;

static {
    executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            synchronized(lock) {
                while(lastEventData == null && !executorService.isShutdown()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException ex) { ... }
                }
                try {
                    actUponEvent(lastEventData);
                } catch (Throwable ex) { ... }
                lastEventData = null;
            }
        }
    }, 250, 250, TimeUnit.MILLISECONDS);
}


public synchronized update(final List<EventData> data) {
    synchronized(lock) {
        lastEventData = data;
        lock.notifyAll();
    }
}

public void dispose() {
    executorService.shutdown();
}

换句话说,我想在事件通知到达后立即收到通知,但将它们限制为每 250 毫秒一个事件,我只对最后一个传入事件感兴趣。

我通过 java.util.concurrent 查看了一些提示/现有解决方案,但找不到任何适合我的问题的东西。 BlockingQueue 起初似乎非常好,因为如果为空,它会阻塞,但另一方面,队列本身对我来说并不重要,因为无论如何我只对最新事件感兴趣,如果满了则插入阻塞不是我也在寻找什么。

【问题讨论】:

  • 能否有比我更了解并发的人查看代码并告诉我它是否真的有意义,以及可能存在的陷阱?

标签: java events concurrency event-handling threadpool


【解决方案1】:

以下模型可以支持非常高的更新率,(达到每秒数千万)但您只需要在内存中保留最新的。


如果您每 N 毫秒拍摄一次快照,您可以使用这种方法。

final AtomicReference<ConcurrentHashMap<Key, Event>> mapRef =

当您有更新时,将其添加到 ConcurrentMap。选择键以便应该替换前一个事件的事件具有相同的键。

Key key = keyFor(event);
mapRef.get().put(key, event);

这种映射方式随时都有任何键的最新更新。

有一个每 N 毫秒运行一次的任务。此任务在运行时可以将地图换成另一个(或以前的空地图以避免创建新地图)

ConcurrentMap<Key, Event> prev = mapRef.set(prevEmptyMap);

for(Event e: prev.values())
    process(e);
prev.clear();
this.prevEmptymap = prev;

【讨论】:

  • 您能否详细说明一下消费者线程应该是什么样子。我不确定如何让消费者线程等待传入事件。有时可能会在较长时间(分钟、小时)内没有事件,有时会发生大量事件(每秒 100 秒)。
  • 我以为你说它每 250 毫秒运行一次,所以它不等待事件,它每 250 毫秒运行一次。如果这不是你的意思,你能澄清你的要求是什么吗?每秒 100 秒并不是很高,您应该能够通过简单的队列来支持这一点。如果你每秒有大约 10K 到 10M 的速度,那么你真的需要上面的地图。
  • 好吧,在我发布的上述示例中,我每 250 毫秒运行一个线程,如果没有要处理的传入事件,则等待新事件到达并立即处理它。在线程结束并再过 250 毫秒后,我将启动下一个线程,依此类推。使用普通队列是什么意思?我不想要任何基于 FIFO 的队列,而是需要一个只存储一个事件并且在消费者端阻塞,但在生产者端非阻塞的队列。
  • 所以你等待 250 毫秒然后立即处理下一个事件,然后再等待 250 毫秒等等?因此,一个事件会快速通过,而其间的每个事件都可能需要等待长达 250 毫秒?我没有说你想要一个队列或一个非阻塞消费者,我只是说这不会有什么真正的区别。 ;)
  • 是的,你是对的,如果每秒事件少于4个,每个事件都会被立即处理,但是如果更多,我会限制事件数量为每秒4个(1000/250) 但只接受最后一个传入事件并丢弃所有其他事件(我不需要它们)。
猜你喜欢
  • 2017-04-26
  • 2011-08-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-09
相关资源
最近更新 更多