【问题标题】:Java BlockingQueue appears to corrupt data during transferJava BlockingQueue 似乎在传输过程中损坏了数据
【发布时间】:2015-05-19 20:19:43
【问题描述】:

我有 n 个生产者线程通过 BlockingQueue 提供 1 个消费者线程。我正在使用 .put 和 .take(后者在 .peek != null 时)。这对于至少十几条消息来说工作正常,除了不变的数据损坏,显然是在传输过程中。目前我只实例化一个生产者线程。

例如,生产者线程将识别一个矩形并设置对象值,然后通过该对象的 get 的调试行显示。损坏前设置的值的示例;

22:13:36.797 [Thread-1] DEBUG a.i.AdvancedVideoAnalytics - ROI = {574, 88, 42x110}

消费者然后 .take 消息,这里是提取值的调试行,其提取方式与前一个线程完全相同。显示了“损坏”值集的示例;

22:13:36.887 [Thread-0] DEBUG a.i.AwarenessAnalytics - ROI = {574, -1, 42x89}

相关生产者代码;

FrameWithMotionDetection frameWithMotionDetection;
private final BlockingQueue<FrameWithMotionDetection> queue;
... 
frameWithMotionDetection = new FrameWithMotionDetection();
frameWithMotionDetection.setMotionData(contourAnalysisResults);
frameWithMotionDetection.setCurrentFrame(frameToExamine);
frameWithMotionDetection.setCamera(camera);
logger.debug("FrameWithMotionDetection.CameraID = {}", frameWithMotionDetection.getCamera().getCameraId());
System.out.println("Preparing to send message to AwarenessAnalytics thread");
try {
    queue.put(frameWithMotionDetection);
    }catch (InterruptedException ex) { 
       System.out.println("Exception in queue.put: " + ex );
    }

主应用线程产生消费者线程;

FrameWithMotionDetection frameWithMotionDetection = new FrameWithMotionDetection();
BlockingQueue<FrameWithMotionDetection> q = new ArrayBlockingQueue<FrameWithMotionDetection>(1024);
AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(q);

相关的消费者代码;

public AwarenessAnalytics(BlockingQueue<FrameWithMotionDetection> q) {
          this.queue = q;
}
...
FrameWithMotionDetection frameWithMotionDetection;
private final BlockingQueue<FrameWithMotionDetection> queue;
...
while (queue.peek() != null){
    frameWithMotionDetection = new FrameWithMotionDetection();
    try {

        frameWithMotionDetection = queue.take();
        frameWithMotionDetectionFromQueue.add(frameWithMotionDetection);
        framesToEvaluate = true;
        }catch (InterruptedException ex) { 
           logger.error("Exception in queue.take: {}", ex );
        }

    logger.debug("FrameMsg received");
    }

生产者线程(AdvancedVideoAnalytics)由消费者线程产生;

tempIntermediateVA = new AdvancedVideoAnalytics(queue);

鉴于大多数数据传输的成功性质,BlockingQueue 是潜在问题还是我应该寻找其他地方?

更新:

在通过 BlockingQueue 发送之前,我正在努力敲定某些变量。这需要一个构造函数定义为;

public FrameWithMotionDetection(
    ContourAnalysisResults motionData,
    Mat currentFrame,
    Camera camera) {
    this.motionData = motionData;
    this.currentFrame = currentFrame;
    this.camera = camera;
}

现在我正在努力定义一个构造函数,让我可以简单地从 queue.take 调用中实例化对象;

frameWithMotionDetection = new FrameWithMotionDetection(queue.take());

或者这是错误的做法?

更新 2:在 .take() 之后直接插入调试语句,很明显问题不是 BlockingQueue,因此将检查其他方面。感谢大家的帮助。

更新 3:事实证明,我传递的复杂对象没有在消费者中实例化为新对象。我以为我创建了一个新实例,甚至使对象中的一些变量成为最终的。一旦我退出重置并重用生产者线程中的复杂对象(现在每次都创建一个新对象),问题就消失了。有几个人非常乐于助人,特别向@markspace 致敬。

【问题讨论】:

  • 问题是,您使用的是哪个BlockingQueue 实现(并且可能使用任何非标准的JVM?)?众所周知,所有java.util.concurrent 实现都是线程安全的。哦,使用peek != null 似乎是多余的。 take() 将阻止,直到有东西可以采取。
  • 您问的问题很有趣,因为我已经尝试了一些导致问题的实现(已经超过 2 个月,所以我不记得问题了),直到我尝试了您在上面看到的内容。这甚至应该工作吗?
  • 在产生消费者的更高级别线程中(AwarenessAnalytics); FrameWithMotionDetection frameWithMotionDetection = new FrameWithMotionDetection(); BlockingQueue q = new ArrayBlockingQueue(1024); AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(q);

标签: java multithreading corruption blockingqueue


【解决方案1】:

如果没有所有代码,就很难准确说出问题所在。但是根据您给我们的信息,您正在为所有线程使用共享的FrameWithMotionDetection 对象。

如果您将FrameWithMotionDetection 定义在与BlockingQueue 相同的级别和范围,那么您做错了。

在方法中定义FrameWithMotionDetection,不要让该方法转义。


这肯定与BlockingQueue无关。

【讨论】:

  • 我认为frameWithMotionDetection 还可以,但他设置的值似乎值得怀疑。请参阅我对 Zim-Zam 回答的评论。
  • @Will 是的,没错。如果您在 run 方法中声明它,它应该会更正确地运行。
  • @JohnVint 我不关心FrameWithMotionDetection。正是他放入该对象的三个值似乎更有可能(意外地)在多个线程之间共享,并且在消费者有机会读取和输出它们之前发生变异。
  • @JohnVint 我绝对同意 OP 没有向我们展示足够的代码来确定问题可能是什么。我特别想看看这三个价值观的创造和所有用途。
  • @Will 那么 Zim-Zam 的回答很可能是罪魁祸首。
【解决方案2】:
  1. 我建议重写 FrameWithMotionDetection,以便所有属性都在构造函数中设置并且也是最终的 - 与多线程环境中的可变数据相比,不可变数据更不容易损坏

  2. 调用peek 然后take 具有潜在危险,因为队列可能在两个方法调用之间被清空(导致消费者无限期挂起)。更安全的替代方法是 poll 超时,或者 take 在生产者完成后调用中断消费者

【讨论】:

  • 我也想说同样的话。在frameWithMotionDetection 内部,contourAnalysisResultsframeToEximecamera 是不可变且线程安全的吗?如果不是,大概就是这样。当这些值为set 时,进行deep 防御性复制。否则另一个线程可能会在你背后更新它们。
  • 1.我认为该建议 2 的优点。这是始终只有 1 个且只有一个消费者的问题吗?
  • @Will 只有一个消费者,peektake 之间没有队列清空的风险,但是如果消费者清空队列的速度比生产者可以填充它。为了防止这种情况,您可以使用一个共享的AtomicInteger,它被初始化为生产者线程的数量,并且生产者在它终止时会减少它;如果AtomicInteger 大于零,则消费者可以调用take,否则它会调用poll,在队列清空之前不会超时
  • @Zim-ZamO'Pootertoot 这是一个令人信服的建议。生产者循环播放视频流,每当检测到运动时生成队列条目。消费者将不断循环到 .peek 队列,如果队列中没有任何内容,则休眠 0.050 秒,然后再次循环。
  • @Will 你可以创建一个复制构造函数:FrameWithMotionDetection(FrameWithMotionDetection original),它调用你的构造函数来设置最终属性,但这可能不是必需的 - 如果对象是不可变的,那么消费者不需要复制一份
猜你喜欢
  • 1970-01-01
  • 2016-05-27
  • 2013-04-25
  • 2014-03-03
  • 2022-12-14
  • 1970-01-01
  • 2013-05-23
  • 2020-03-18
  • 1970-01-01
相关资源
最近更新 更多