【问题标题】:BlockingCollection.TryTake() exceeds timeoutBlockingCollection.TryTake() 超过超时
【发布时间】:2014-11-24 23:04:03
【问题描述】:

在我的应用程序中,我有几个线程用于处理 TCP 连接(一个用于读取,一个用于发送,一个用于处理新的传入连接)。每个线程为所有客户端处理给定类型的操作,因此假设它将数据发送到不同 IP 上的 5 个TcpClient 实例。我使用BlockingCollection 作为缓冲区,因为我从发送线程访问它,但也从另一个生成要发送的数据的线程访问它。我在发送线程中运行的函数如下所示:

    private void Sender()
    {
        while (run)
        {
            List<object[]> toRemove = new List<object[]>(); //bypass the fact that I cant remove a foreach iteration item from the BlockingCollection
            foreach (object[] obj in sendBuffer.ToList())
            {
                string IP = obj[0].ToString();
                string msg = obj[1].ToString();
                byte[] data = Encoding.UTF8.GetBytes(msg);
                foreach (TcpClient tcpc in TcpClients)
                {
                    if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP)
                    {
                        NetworkStream stream = tcpc.GetStream();
                        stream.Write(data, 0, data.Length);
                        break;
                    }
                }
                toRemove.Add(obj);
            }
            for (int i = 0; i < toRemove.Count; i++) //the bypass mentioned above
            {
                object[] rmv = toRemove[i];
                sendBuffer.TryTake(out rmv);
            }
        }
    }

注意:使用的BlockingCollection&lt;object[]&gt; 类型。我的问题是,在某个流量点,缓冲区开始填满。我在缓冲区中设置了最多 500 条消息的限制,它很容易溢出。 现在,如果我理解正确(不确定),TryTake 所做的就是尝试删除该项目,如果此时正在使用该集合,它会等待并重试。 (注意:我也尝试将超时设置为 50 毫秒)。如果这是真的(如果不是,请有人纠正我并提出不同的原因),问题可能是大多数时候调用 TryTake 时集合很忙。会是这样吗?如果是,如何解决?

对于集合的使用,生成数据的线程在 foreach 中大约每 2 秒访问一次集合,迭代范围为 1-80 个项目。缓冲区开始出现大约 20 多个项目的问题,直到那时,它都很好。 发件人线程现在只发送给一个客户端,以后会达到 15 个。所以在高峰期,这将是 80 个项目 x 15 个用户 = 大约每 2 秒 1200 次访问。 任何建议都非常感谢,谢谢。

【问题讨论】:

    标签: c# multithreading collections thread-safety blockingcollection


    【解决方案1】:

    TryTake 不像您描述的那样运行,默认 BlockingCollection 使用 ConcurrentQueue 来存储项目,TryTake 会将队列中的下一个项目分配给提供的输出引用。

    例如

    BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>();
    
    object[] message = new object[2];
    object[] message2 = new object[2];
    // Add messages to the queue
    sendBuffer.Add(message);
    sendBuffer.Add(message2);
    
    object[] toSend;
    // Take next message off the queue
    sendBuffer.TryTake(out toSend);
    
    // toSend === message
    

    在您的情况下,您可以使用 BlockingCollection.Take() 等待消息发送:

    BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>();
    // CancellationTokenSource is used in place of your run variable.
    System.Threading.CancellationTokenSource cancellationSource 
    = new System.Threading.CancellationTokenSource();
    
        private void Sender()
        {
            // Exit loop if cancellation requested
            while (!cancellationSource.Token.IsCancellationRequested)
            {
    
                object[] obj;
    
                try {
                    // Blocks until an item is available in sendBuffer or cancellationSource.Cancel() is called.
                    obj = sendBuffer.Take(cancellationSource.Token);
                } catch (OperationCanceledException) {
                    // OperationCanceledException will be thrown if cancellationSource.Cancel() 
                    // is called during call to sendBuffer.Take
                    break;
                } catch (InvalidOperationException) {
                    // An InvalidOperationException means that Take() was called on a completed collection.
                    // See BlockingCollection<T>.CompleteAdding
                    break;
                }
    
                string IP = obj[0].ToString();
                string msg = obj[1].ToString();
                byte[] data = Encoding.UTF8.GetBytes(msg);
                foreach (TcpClient tcpc in TcpClients) {
                    if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP) {
                        NetworkStream stream = tcpc.GetStream();
                        stream.Write(data, 0, data.Length);
                        break;
                    }
                }               
            }
        } 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-16
      • 2010-12-29
      • 2011-01-07
      相关资源
      最近更新 更多