【发布时间】:2015-05-19 20:19:43
【问题描述】:
我有 n 个生产者线程通过 BlockingQueue 提供 1 个消费者线程。我正在使用 .put 和 .take(后者在 .peek != null 时)。这对于至少十几条消息来说工作正常,除了不变的数据损坏,显然是在传输过程中。目前我只实例化一个生产者线程。
例如,生产者线程将识别一个矩形并设置对象值,然后通过该对象的 get 的调试行显示。损坏前设置的值的示例;
22:13:36.797 [Thread-1] DEBUG a.i.AdvancedVideoAnalytics - ROI = {574, 88, 42x110}
消费者然后 .take 消息,这里是提取值的调试行,其提取方式与前一个线程完全相同。显示了“损坏”值集的示例;
22:13:36.887 [Thread-0] DEBUG a.i.AwarenessAnalytics - ROI = {574, -1, 42x89}
相关生产者代码;
FrameWithMotionDetection frameWithMotionDetection;
private final BlockingQueue<FrameWithMotionDetection> queue;
...
frameWithMotionDetection = new FrameWithMotionDetection();
frameWithMotionDetection.setMotionData(contourAnalysisResults);
frameWithMotionDetection.setCurrentFrame(frameToExamine);
frameWithMotionDetection.setCamera(camera);
logger.debug("FrameWithMotionDetection.CameraID = {}", frameWithMotionDetection.getCamera().getCameraId());
System.out.println("Preparing to send message to AwarenessAnalytics thread");
try {
queue.put(frameWithMotionDetection);
}catch (InterruptedException ex) {
System.out.println("Exception in queue.put: " + ex );
}
主应用线程产生消费者线程;
FrameWithMotionDetection frameWithMotionDetection = new FrameWithMotionDetection();
BlockingQueue<FrameWithMotionDetection> q = new ArrayBlockingQueue<FrameWithMotionDetection>(1024);
AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(q);
相关的消费者代码;
public AwarenessAnalytics(BlockingQueue<FrameWithMotionDetection> q) {
this.queue = q;
}
...
FrameWithMotionDetection frameWithMotionDetection;
private final BlockingQueue<FrameWithMotionDetection> queue;
...
while (queue.peek() != null){
frameWithMotionDetection = new FrameWithMotionDetection();
try {
frameWithMotionDetection = queue.take();
frameWithMotionDetectionFromQueue.add(frameWithMotionDetection);
framesToEvaluate = true;
}catch (InterruptedException ex) {
logger.error("Exception in queue.take: {}", ex );
}
logger.debug("FrameMsg received");
}
生产者线程(AdvancedVideoAnalytics)由消费者线程产生;
tempIntermediateVA = new AdvancedVideoAnalytics(queue);
鉴于大多数数据传输的成功性质,BlockingQueue 是潜在问题还是我应该寻找其他地方?
更新:
在通过 BlockingQueue 发送之前,我正在努力敲定某些变量。这需要一个构造函数定义为;
public FrameWithMotionDetection(
ContourAnalysisResults motionData,
Mat currentFrame,
Camera camera) {
this.motionData = motionData;
this.currentFrame = currentFrame;
this.camera = camera;
}
现在我正在努力定义一个构造函数,让我可以简单地从 queue.take 调用中实例化对象;
frameWithMotionDetection = new FrameWithMotionDetection(queue.take());
或者这是错误的做法?
更新 2:在 .take() 之后直接插入调试语句,很明显问题不是 BlockingQueue,因此将检查其他方面。感谢大家的帮助。
更新 3:事实证明,我传递的复杂对象没有在消费者中实例化为新对象。我以为我创建了一个新实例,甚至使对象中的一些变量成为最终的。一旦我退出重置并重用生产者线程中的复杂对象(现在每次都创建一个新对象),问题就消失了。有几个人非常乐于助人,特别向@markspace 致敬。
【问题讨论】:
-
问题是,您使用的是哪个
BlockingQueue实现(并且可能使用任何非标准的JVM?)?众所周知,所有java.util.concurrent实现都是线程安全的。哦,使用peek != null似乎是多余的。take()将阻止,直到有东西可以采取。 -
您问的问题很有趣,因为我已经尝试了一些导致问题的实现(已经超过 2 个月,所以我不记得问题了),直到我尝试了您在上面看到的内容。这甚至应该工作吗?
-
在产生消费者的更高级别线程中(AwarenessAnalytics); FrameWithMotionDetection frameWithMotionDetection = new FrameWithMotionDetection(); BlockingQueue
q = new ArrayBlockingQueue (1024); AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(q);
标签: java multithreading corruption blockingqueue