【问题标题】:Java Producer/Consumer using LinkedBlockingQueue processing duplicates?Java Producer/Consumer 使用 LinkedBlockingQueue 处理重复项?
【发布时间】:2017-05-02 12:22:57
【问题描述】:

我有一个linkedblockingqueue,在处理列表时我看到重复的消息。任何人都知道使用这种方法如何产生重复?下面是队列的声明,然后是每个生产者和消费者的 run() 函数。

当我在一个线程中执行此操作时,我不会重复。这意味着我不使用队列,只是从 UDP 输入中读取并在单个线程中直接调用 ProcessScadaMsg newMessage = new ProcessScadaMsg(byte[]) 。这导致了问题,因为 UDP 消息进来太快并且被遗漏了,所以我不得不将它们分成生产者/消费者。

//Declaration
public static BlockingQueue<byte[]> UDPMessageQueue = new LinkedBlockingQueue<byte[]>();

// Producer
public void run()
{
    DatagramSocket receiveSock = null;
    // Create socket for receiving data
    try
    {
        receiveSock = new DatagramSocket(port);
    } catch (SocketException e2)
    {
        // TODO Auto-generated catch block
        errorLog.error("Unable to open socket.");
    }



    while (true)
    {
        // buffer to receive incoming data
        byte[] buffer = new byte[DataAdapterFB1.HEADER_SIZE +  DataAdapterFB1.MAX_DATA_BYTES];
        DatagramPacket incoming = new DatagramPacket(buffer, buffer.length);
        try
        {
            receiveSock.receive(incoming);
        } catch (IOException e1)
        {
            errorLog.fatal("Failed to read from IO port.");
            System.exit(1);
        } catch (NullPointerException e2)
        {
            errorLog.fatal("IO Port unavailable or in use.");
            System.exit(1);
        }

        DataAdapterFB1.UDPMessageQueue.add(incoming.getData());

    }


// Consumer
public void run()
{

    while (true)
    {
        try
        {
            ProcessScadaMsg newMessage = new ProcessScadaMsg(DataAdapterFB1.UDPMessageQueue.take());
        } catch (InterruptedException e)
        {
            errorLog.warn("Queue processing interrupted.");
        }            
    }

}

【问题讨论】:

    标签: java queue blockingqueue


    【解决方案1】:

    您正在创建一个包含缓冲区的 DatagramPacket 对象,然后您将在该数据包的循环中多次接收。因此,假设您连续收到两个数据包。当收到第一个节点时,缓冲区会被填充,并作为第一个节点添加到列表中。当接收到第二个数据包时,它被写入内存中的同一个缓冲区,并且该缓冲区在第二个节点中排队。

    因此,从消费者的角度来看,当第一个节点出队时,它将指向与第二个节点相同的缓冲区,第二个节点的内容已被第二次读取覆盖。

    需要在循环中分配缓冲区和DatagramPacket:

    while (true)
    {
      // buffer to receive incoming data
      byte[] buffer = new byte[DataAdapterFB1.HEADER_SIZE + DataAdapterFB1.MAX_DATA_BYTES];
      DatagramPacket incoming = new DatagramPacket(buffer, buffer.length);
      ...
    

    【讨论】:

    • 谢谢罗伯托。过完年回办公室一定要试试这个。
    • 这可以修复重复项,但已经充分减慢了队列不断增长并且无法足够快地出列的处理速度。有什么想法吗?
    • CPU 使用率也是一个问题。
    • 不确定您是否已经这样做了,但您希望有多个线程从队列中使用,以利用多核架构。这不会为您的直接处理节省 CPU,但应该可以足够快地消耗而不会使队列增长太多;垃圾收集器不会启动那么多,这应该会间接节省 CPU 周期
    • 我觉得队列没问题。您可能希望使其有界(在构造函数中指定容量),以便您可以对发送者造成背压。如果数据包大小相对较小,您还可以在创建 DatagramPacket 实例时使用单字节 [] 缓冲区和 slicing 进行调查,这里的想法是后续数据包之间会有更多的内存局部性,应该使用缓存更有效。
    猜你喜欢
    • 2019-08-06
    • 1970-01-01
    • 1970-01-01
    • 2017-04-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多