【发布时间】:2017-05-02 12:22:57
【问题描述】:
我有一个linkedblockingqueue,在处理列表时我看到重复的消息。任何人都知道使用这种方法如何产生重复?下面是队列的声明,然后是每个生产者和消费者的 run() 函数。
当我在一个线程中执行此操作时,我不会重复。这意味着我不使用队列,只是从 UDP 输入中读取并在单个线程中直接调用 ProcessScadaMsg newMessage = new ProcessScadaMsg(byte[]) 。这导致了问题,因为 UDP 消息进来太快并且被遗漏了,所以我不得不将它们分成生产者/消费者。
//Declaration
public static BlockingQueue<byte[]> UDPMessageQueue = new LinkedBlockingQueue<byte[]>();
// Producer
public void run()
{
DatagramSocket receiveSock = null;
// Create socket for receiving data
try
{
receiveSock = new DatagramSocket(port);
} catch (SocketException e2)
{
// TODO Auto-generated catch block
errorLog.error("Unable to open socket.");
}
while (true)
{
// buffer to receive incoming data
byte[] buffer = new byte[DataAdapterFB1.HEADER_SIZE + DataAdapterFB1.MAX_DATA_BYTES];
DatagramPacket incoming = new DatagramPacket(buffer, buffer.length);
try
{
receiveSock.receive(incoming);
} catch (IOException e1)
{
errorLog.fatal("Failed to read from IO port.");
System.exit(1);
} catch (NullPointerException e2)
{
errorLog.fatal("IO Port unavailable or in use.");
System.exit(1);
}
DataAdapterFB1.UDPMessageQueue.add(incoming.getData());
}
// Consumer
public void run()
{
while (true)
{
try
{
ProcessScadaMsg newMessage = new ProcessScadaMsg(DataAdapterFB1.UDPMessageQueue.take());
} catch (InterruptedException e)
{
errorLog.warn("Queue processing interrupted.");
}
}
}
【问题讨论】:
标签: java queue blockingqueue