【问题标题】:Concurrent queue is not dequeued/cleared并发队列未出队/清除
【发布时间】:2017-06-26 14:52:11
【问题描述】:

我的要求是将项目插入队列并进行处理,但应先添加项目,然后再处理它们(因为在处理项目之前需要设置一些其他的东西. 这是我到目前为止所做的编码。

    #region Variables Declarations

    private Thread threadTask = null;

    ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();     
    string currentSeqNo;
    string previousSeqNo = "-1";        

    #endregion

    private void test1_Load(object sender, EventArgs e)
    {
        AddItems();           

        if (threadTask == null)
        {
            threadTask = new Thread(Kick);
            Thread.Sleep(5000);               
            threadTask.Start();
        }
    }

    private void AddItems()
    {
        for (Int64 i = 100000; i < 300000; i++)
        {                
            concurrentQueue.Enqueue(i.ToString());
            this.Invoke(new MethodInvoker(delegate()
            {                    
                label1.Text = i.ToString();
                label1.Update();                   
            }));
        }
    }

    private void Kick()
    {
        while (true)
        {
            int recordCountNew = concurrentQueue.Count();
            if (recordCountNew != 0)
            {
                RemoveItems();
            }
        }
    }

    private void RemoveItems()
    {
        string item;

        while (concurrentQueue.TryDequeue(out item))
        {
            this.Invoke(new MethodInvoker(delegate()
            {                   
                label2.Text = item;
                label2.Update();
            }));

            currentSeqNo = item;    // second time does not start wil 100000

            if (previousSeqNo != "-1")
            {
                if (long.Parse(currentSeqNo) != long.Parse(previousSeqNo) + 1)
                {
                    Reconnect();
                }
                else
                {
                    //Process item
                    previousSeqNo = currentSeqNo;
                }
            }
            else
            {
                //Process item
                previousSeqNo = currentSeqNo;
            }
        }
    }

    private void Reconnect()
    {            
        currentSeqNo = "";
        previousSeqNo = "-1";

        string someItem;
        while (concurrentQueue.Count > 0)
        {
            concurrentQueue.TryDequeue(out someItem);
        }

        this.Invoke(new MethodInvoker(delegate()
        {
            label1.Text = "";
            label2.Text = "";

            label1.Update();
            label2.Update();
        }));

        AddItems();

        if (threadTask == null)
        {
            threadTask = new Thread(Kick);                
            threadTask.Start();
        }
    }

    private void button1_Click_1(object sender, EventArgs e)
    {
        Reconnect();
    }

重现问题:运行应用程序并在中间单击按钮。现在队列应该再次从 100000 开始,但它显示的数字大于 100000。

请告知单击按钮后如何释放所有资源以重新开始。虽然我将它们设置为默认值并清除队列,但是当调用“RemoveItems”方法时,它仍然显示 currentSeqNo 中的旧值。

【问题讨论】:

  • 如果您的删除项目比您添加的项目更快,您的队列提前结束,您应该考虑使用BlockingCollection&lt;string&gt;,它使用ConcurrentQueue 作为默认支持集合。
  • 我试过 BlockingCollection 但情况还是一样。
  • 为什么不创建一个新的 ConcurrentQueue 并交换引用?
  • ConcurrentQueue wssMessagesQueue = new ConcurrentQueue(); concurrentQueue = wssMessagesQueue;
  • 试过这个.. 但还是不太幸运

标签: c# multithreading winforms


【解决方案1】:

您看到的是 Kick 线程和按钮单击处理程序之间的竞争条件。当您按下执行 Reconnect() 的按钮时,您会清理队列,然后调用 AddItems() 函数。但一直以来,Kick 函数都在尝试出队,因此每次都会在其中包含任意数量的项目。您应该做的是在这些函数之间进行同步,或者在添加项目时阻止 Kick 执行。

【讨论】:

  • 感谢杰森的理解。我被困得很厉害,以至于我不知道如何停止线程并重新启动它。我不知道如何进行同步。尝试了很多东西,但没有什么符合我的要求,即并行运行两个线程,其中一个正在排队,另一个正在出队。必须先开始接收消息,然后再处理它们。它第一次完美地工作,但是当我需要再次执行相同的步骤时,它无法正常工作,因为线程没有停止并且我得到任意值。你能建议如何使它们同步吗?
  • 只需按照@Scott Chamberlain 的建议更改为 BlockingCollection 即可。使用 TryAdd/TryTake。此外,当您检查时,请确保您正在检查 AddItems() 调用后的状态。
  • 我尝试了blockingcollection,但结果是一样的。 AddItems() 调用将始终在集合中添加值。那么我在哪里检查状态 - 在 Reconnect() 方法中?我该怎么做?
【解决方案2】:

几个cmets:

1) 你Kick() 方法有一个无限循环,也没有睡眠。每个启动的线程都将继续运行,因为您没有线程出现的范围。

你可以有一个像bKeepRunning 这样的成员变量,默认值为true。在Reconnect() 函数的开头将该变量设置为false。比如:

private void Kick()
{
    while (bKeepRunning)
    {
        int recordCountNew = concurrentQueue.Count();
        if (recordCountNew != 0)
        {
            RemoveItems();
        }
    }
}
  1. 为什么test1_Load() 中有Thread.Sleep(5000);?我不认为这是必要的。

我对你的代码做了一些小改动,比如:

private void AddItems()
{
    for (Int64 i = 100000; i < 300000; i++)
    {
        concurrentQueue.Enqueue(i.ToString());

        this.Invoke(new MethodInvoker(delegate()
        {
            label1.Text = i.ToString();
            label1.Update();
        }));

        if (i < 100004)
            Thread.Sleep(1000);
    }
}

private void Kick()
{
    while (bKeepRunning)
    {
        int recordCountNew = concurrentQueue.Count();
        if (recordCountNew != 0)
        {
            RemoveItems();
        }
    }
} 

private void Reconnect()
{
    currentSeqNo = "";
    previousSeqNo = "-1";
    bKeepRunning = false;
    threadTask = null;

    string someItem;
    while (concurrentQueue.Count > 0)
    {
    concurrentQueue.TryDequeue(out someItem);
    }

    this.Invoke(new MethodInvoker(delegate()
    {
    label1.Text = "";
    label2.Text = "";

    label1.Update();
    label2.Update();
    }));
    Thread.Sleep(2000);

    AddItems();

    bKeepRunning = true;


    if (threadTask == null)
    {
    threadTask = new Thread(Kick);
    threadTask.Start();
    }
}

它帮助我看到该值是从 100000 开始的。您可以在最后尝试相同的方法。

注意:我已经停止线程并在单击按钮后重新启动。因此,我在您的代码中没有看到任何缺陷。它只是运行得很快,因此您无法实现起始值。

【讨论】:

  • 使用上面的代码来演示停止/重启线程。前 4 个值之前的 1 秒延迟将让您注意到它从 100000 重新开始。
【解决方案3】:

你应该让 UI 线程threadTask 线程 同步,只需使用 ManualResetEventSlim 信号构造即可,如下所示:

static ManualResetEventSlim guard = new ManualResetEventSlim(true);
private void button1_Click_1(object sender, EventArgs e)
{
    guard.Reset();
    Reconnect();
    guard.Set();
}
private void RemoveItems()
{
    string item;

    while (concurrentQueue.TryDequeue(out item))
    {
        guard.Wait();
        //......
     }
}

见:

ManualResetEventSlim Class

【讨论】:

  • 不起作用..它仍然给出随机数并且不以 100000 开头
  • 也许你可以在 AddItems 方法处设置一个断点来查看确切的数字。
  • AddItems 总是以 100000 开头,但是在出队时它需要随机数,因为这是一个单独的线程并且它不会清除资源
  • 另外,AddItems 只是生产中的一个测试应用程序,数量会不断增加
  • @user1254053,当你调用guard.Reset()时,threadTask线程会被阻塞,直到你调用guard.Set()。所以队列中的所有资源都会被清空。
猜你喜欢
  • 2017-06-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-06-16
  • 2015-11-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多