【问题标题】:Concurrent processing of single InputStream with independent consumers单个 InputStream 与独立消费者的并发处理
【发布时间】:2012-07-05 07:18:39
【问题描述】:

我需要生成 N 个消费者线程,它们同时处理相同的 InputStream,例如 - 以某种方式对其进行转换、计算校验和或数字签名等。这些消费者不相互依赖,并且都使用第三方库,这接受 InputStream 作为数据源。

所以我能做的是 - 创建一些 InputStream 的实现,这将

  • 从“父”流中读取数据块
  • 取消屏蔽消费者
  • 等到每个消费者都读完整个块
  • 读取下一个块

虽然看起来很简单,但它可能会引发各种问题,例如某些消费者死亡时的活锁、实现所有 InputStream 方法、使用屏障/闩锁控制消费者自己的 fork/join 等。

一个哥们告诉我,实施需要半个小时,这让我度过了一个晚上。

我宁愿使用足够成熟的东西(谷歌搜索没有结果,因此我的 google-fu 不够好?)或者不打扰并将整个“源”流复制到临时文件中将其用作数据源。后一种解决方案似乎更可靠,但最终可能会创建千兆字节文件(例如在处理流式音频时)。

【问题讨论】:

  • 你可以将数据写入文件并生成 N 个 FileInputStreams 吗?
  • @JonLin 正如他在问题末尾所说的那样,他可以。

标签: java concurrency stream inputstream


【解决方案1】:

在我看来,您至少应该有某种缓冲,以便不同的消费者可以以不同的速度在流中移动,而不会一直被当前最慢的消费者拖住。这基本上确保了最坏情况下的性能和很少的并发好处。

例如,您可以用迄今为止使用过的消费者标记每个块,然后删除完全用完的消费者。也许这可以通过每个消费者持有对它尚未使用的每个块的引用来实现,这将允许 GC 自动处理已使用的块。生产者可能会为这些块保留一个WeakReferences 列表,这样它就可以处理尚未使用的块数量,并以此为基础进行节流。

我也在考虑每个线程有一个单独的InputStream 实例,它在内部与生产者InputStream 通信。这样,您就可以轻松解决活锁风险:try ... finally { is.close(); } - 垂死的消费者关闭自己的输入流。这会传达给生产者。

我对每个消费者使用ArrayBlockingQueue 有一些想法。在不让生产者阻塞或忙于等待的情况下确保所有消费者都得到适当的喂食会有些困难。

【讨论】:

  • 我不会说这没什么好处 - 让 5 个消费者工作 1 秒,一个消费者工作 2 秒,并发调用将给 2 秒,而顺序调用将给 7 秒。或者我在这里错过了什么?标记块和缓冲区后,我会遇到内存消耗,这是我想避免的。
  • 是的,你说的是不可避免的。但是,如果您的消费者平均平衡,但他们的表现差异很大,那么如果您总是等待当前落后的每个消费者,您将失去同意的机会。缓冲在那里会有所帮助。而如果引入线程优先级平衡,其实也可以实现这样的情况。
【解决方案2】:

您是否考虑过使用管道流?您的生产者可以有一个或多个PipedOuputStream,它会抛出从文件中读取的任何内容。在管道的另一端,您有不同的消费者线程读取相应的PipedInputstream(这是一个可以与您的库共享的 InputStream)。

您的生产者线程可以决定应该通过哪个管道发送数据,通过这种方式,为在管道另一侧读取的给定消费者线程提供要处理的数据。

如果您需要从消费者线程中取回数据,那么您可以创建另一个相反方向的管道,将数据发回给您。

【讨论】:

  • A PipedOutputStream 将在任何消费者落后时立即阻止生产者,使所有其他消费者挨饿。
【解决方案3】:

您可以尝试一些 Java 消息服务 (JMS) 实现,例如 Apache ActiveMQ

在您的情况下,您需要创建一个所谓的主题(请参阅Topics vs. Queues)。一个话题由生产者创建,并发布给 N 个消费者,这些消费者可以同时运行,每个消费者接收完全相同的数据。

由于您想使用InputStreams,因此有一章介绍如何使用send messages are streams

我想,生产者和消费者通常是独立的进程,可能运行在网络上的不同机器上。不过,我认为您可以将其配置为完全在单个 JVM 中运行。这将取决于 JMS 的实现。这些也很有名:HornetQ by JBossRabbitMQ 和一大堆其他人。

【讨论】:

    猜你喜欢
    • 2013-06-09
    • 2015-10-03
    • 1970-01-01
    • 1970-01-01
    • 2021-07-11
    • 2018-03-16
    • 1970-01-01
    • 1970-01-01
    • 2017-03-29
    相关资源
    最近更新 更多