【问题标题】:Synchronization between multiple threads多线程之间的同步
【发布时间】:2014-06-09 10:22:04
【问题描述】:

我很少有线程可以调用两个或多个方法。我需要同步它们,所以我尝试使用barrier类:

Barrier barrier = new Barrier(2); // 2 = #threads participating.
bool complete = false;
TaskFactory factory = Task.Factory;

// Start tasks
Task task_1 = factory.StartNew(() =>
{
    process_1.Server("1 and 2");
    barrier.SignalAndWait(); // Wait for task 2 to catch up.
    barrier.SignalAndWait(); // Wait for task 2 to print "2" and set complete = true.

    if (complete)
    {
        process_1.Server("1 and 3");
    }
});
Task task_6 = factory.StartNew(() =>
{
    process_6.Server("6 and 4");
    process_6.Server("6 and 3");
});
Task task_2 = factory.StartNew(() =>
{
    barrier.SignalAndWait(); // Wait for task 1 to print "1".
    process_2.Client("1 and 2");
    complete = true;
    barrier.SignalAndWait(); // Wait for task 1 to read complete as true.

    process_2.Server("2 and 5");
    process_2.Server("2 and 3");
});
Task task_4 = factory.StartNew(() =>
{
    process_4.Client("6 and 4");
    process_4.Server("4 and 7");
    process_4.Server("4 and 3");
});
Task task_5 = factory.StartNew(() =>
{
    process_5.Client("2 and 5");
    process_5.Server("5 and 3");
});
Task task_7 = factory.StartNew(() =>
{
    process_7.Client("4 and 7");
    process_7.Server("7 and 3");
});
Task task_3 = factory.StartNew(() =>
{
    process_3.Client("1 and 3");
    process_3.Client("2 and 3");
    process_3.Client("4 and 3");
    process_3.Client("5 and 3");
    process_3.Client("6 and 3");
    process_3.Client("7 and 3");
});

task_3.Wait();

我需要确保从不同线程调用方法之间的结果,例如:process_1.Server("1 and 2");process_2.Client("1 and 2");。在Server 之前调用Client 方法是不可接受的。所有依赖:{process_1.Server("1 and 2"); process_2.Client("1 and 2");}, {process_2.Server("2 and 5"); process_5.Client("2 and 5");}, {process_6.Server("6 and 4"); process_4.Client("6 and 4");}, {process_4.Server("4 and 7"); process_7.Client("4 and 7");}, {process_1.Server("1 and 3"); process_3.Client("1 and 3");}, {process_2.Server("2 and 3"); process_3.Client("2 and 3");}, {process_4.Server("4 and 3"); process_3.Client("4 and 3");}, {process_5.Server("5 and 3"); process_3.Client("5 and 3");}, {process_6.Server("6 and 3"); process_3.Client("6 and 3");}, {process_7.Server("7 and 3"); process_3.Client("7 and 3");}.

在元素 {...}{...} 之间没有依赖关系。因此可以执行{process_6.Server("6 and 3"); process_3.Client("6 and 3");}, {process_7.Server("7 and 3"); process_3.Client("7 and 3");},反之亦然{process_7.Server("7 and 3"); process_3.Client("7 and 3");}, {process_6.Server("6 and 3"); process_3.Client("6 and 3");}。我写的{...} 中的元素之间存在依赖关系。你能帮我解决这个问题吗?我不知道如何实现这一点。

非常感谢!

完整的程序代码:

class Pipe
{
    public string message;

    public Pipe()
    {
        message = "";
    }

    public Pipe(string message)
    {
        this.message = message;
    }

    public void Server(object pipeName)
    {
        // Create a name pipe
        using (NamedPipeServerStream pipeStream = new NamedPipeServerStream(pipeName.ToString()))
        {
            // Wait for a connection
            pipeStream.WaitForConnection();

            using (StreamWriter sw = new StreamWriter(pipeStream))
            {
                sw.AutoFlush = true;
                sw.WriteLine(message);
            }
        }

        Console.Write("Communication between processes " + pipeName.ToString());
    }

    public void Client(object pipeName)
    {
        using (NamedPipeClientStream pipeStream = new NamedPipeClientStream(pipeName.ToString()))
        {
            // The connect function will indefinately wait for the pipe to become available
            // If that is not acceptable specify a maximum waiting time (in ms)
            pipeStream.Connect();

            using (StreamReader sr = new StreamReader(pipeStream))
            {
                // We read a line from the pipe and print it together with the current time
                message += sr.ReadLine();
            }
        }

        Console.WriteLine(": client received message.\n");
    }

    static void Main(string[] args)
    {

            Pipe process_1 = new Pipe("Test message from process #1.");
            Pipe process_2 = new Pipe();
            Pipe process_3 = new Pipe();
            Pipe process_4 = new Pipe();
            Pipe process_5 = new Pipe();
            Pipe process_6 = new Pipe("Test message from process #6.");
            Pipe process_7 = new Pipe();

            TaskFactory factory = Task.Factory;

            // Start tasks
            Task task_1 = factory.StartNew(() => { process_1.Server("1 and 2"); process_1.Server("1 and 3"); });
            Task task_6 = factory.StartNew(() => { process_6.Server("6 and 4"); process_6.Server("6 and 3"); });
            Task task_2 = factory.StartNew(() => { process_2.Client("1 and 2"); process_2.Server("2 and 5"); process_2.Server("2 and 3"); });
            Task task_4 = factory.StartNew(() => { process_4.Client("6 and 4"); process_4.Server("4 and 7"); process_4.Server("4 and 3"); });
            Task task_5 = factory.StartNew(() => { process_5.Client("2 and 5"); process_5.Server("5 and 3"); });
            Task task_7 = factory.StartNew(() => { process_7.Client("4 and 7"); process_7.Server("7 and 3"); });
            Task task_3 = factory.StartNew(() => { process_3.Client("1 and 3"); process_3.Client("2 and 3"); process_3.Client("4 and 3"); process_3.Client("5 and 3"); process_3.Client("6 and 3"); process_3.Client("7 and 3"); });

            task_3.Wait();
    }
}

【问题讨论】:

  • 能否简单解释一下process_n的变量类型是什么?是不是你自己写的一个类,包含ServerClient这两个方法,string作为sincle输入参数,void返回值?您是否尝试过首先最小化您的问题,例如从只有 2 个线程开始,然后通过添加更多线程来增加复杂性?
  • 我同意@isi 的观点,我在阅读这个问题时脑子转不过来......
  • 我添加了完整程序的代码。是的,你理解正确:它是我自己编写的一个类,包含 Server 和 Client 两种方法,字符串作为输入参数和 void 返回值
  • @HABJAN,对不起,我试图解释我的任务。问题是:如果我在主类中添加循环,有时客户端方法在服务器之前启动,或者两个服务器在客户端之前启动。所以在控制台我有Communication between processes 1 and 3Communication between processes 1 and 3 而不是Communication between processes 1 and 3: client received message. 你能帮我同步这些任务吗?

标签: c# multithreading synchronization


【解决方案1】:

如果我对您的理解正确,您需要确保在您的Pipe 对象上调用Server 之前,永远不会调用方法Client。我已将您的示例简化为并添加了一个测试类来记录行为。简化后的代码包含一个更简单的Pipe 类形式,它现在只将一些字符串放在作为参数传入 c'tor 的列表中,而不是创建真正的管道。

同步完全由Pipe 的装饰子类BlockingPipe 处理。 BlockingPipe 使用一些称为条件锁或条件同步的低级机制。 Jeff Magee 和 Jeff Kramer 写了一本关于并发模式及其在 Java 中的应用的好书,请查看 condition sync. (java) slides 12-14 或 c# condition sync. in c#,尤其是查看 @john skeet 的答案,他指出了另一个很好的参考。该模式包括使用pulse method 通知所有等待的线程。

足够的理论,回到你的代码。这是简化的Pipe 类:

class Pipe
{
    internal static int counter = 0;
    private readonly int id = counter++;
    private readonly IList<string> calls;
    public Pipe(IList<string> calls) { this.calls = calls; }
    public virtual void Server(string s) { EnqueeCall(s, "server"); }
    public virtual void Client(string s) { EnqueeCall(s, "client"); }
    private void EnqueeCall(string s, string actor)
    {
        calls.Add(actor + id + " processes " + s);
    }
}

现在BlockingPipe 类使用条件同步。给定BlockingPipe 对象的条件和状态可以建模为有限状态机。您的BlockingPipe 可以处于两种状态 - ServerCalled 和 ServerNotCalled。状态用于维护每个方法的这种依赖关系。子类委托给基类的实现,以便更好地分离常用逻辑和同步逻辑:

class BlockingPipe : Pipe
{
    public BlockingPipe(IList<string> calls) : base(calls) { }
    private enum State { ServerCalled, ServerNotCalled }
    private State state = State.ServerNotCalled;
    public override void Server(string s)
    {
        lock (this)
        {
            base.Server(s);
            state = State.ServerCalled;
            Monitor.Pulse(this);
        }
    }
    public override void Client(string s)
    {
        lock (this)
        {
            while (state != State.ServerCalled)
                Monitor.Wait(this, 200);
            base.Client(s);
        }
    }
}

最后一步是测试类。

[TestClass]
public class SomeTestClass
{
    [TestMethod]
    public void TestMethod()
    {
        for (var i = 0; i < 100; i++) Test();
    }

    private static void Test()
    {
        Pipe.counter = 0;
        var list = new List<string>();
        var p = new BlockingPipe(list);
        var f = Task.Factory;
        var b = new Barrier(3);
        f.StartNew(() => { p.Client("asdf"); b.SignalAndWait(); });
        f.StartNew(() => { p.Server("qwer"); b.SignalAndWait(); });
        b.SignalAndWait();
        var exp = String.Join("\n", 
          new[] { "server0 processes qwer", "client0 processes asdf" });
        var act = String.Join("\n", list);
        Assert.AreEqual(exp, act);
    }
}

Test 方法可以被调用任意多次(希望)总是产生正确的行为。我希望这可以扩展到您的用例。测试检查在 Pipe 上执行的调用是否具有该形式:

server0 进程 qwer client0 处理 asdf

两个创建的线程共享Pipe 对象的同一个实例。为了测试这个解决方案的稳健性,我添加了一个 for 循环调用实际的 Test 方法一百次,总是产生相同的结果。我注意到的唯一缺陷是条件同步模式的实现本身无法在BlockingPipewhile 循环内的Monitor.Wait 调用上添加超时。因为我有一个线程永远等待接收Pulse,但脉冲线程已经返回,这可能还需要为这种情况添加另一个条件。

【讨论】:

    【解决方案2】:

    您可能想要创建一个自定义的SynchronizationContext,并拥有自己的任务队列。然后根据它们的依赖关系选择要执行的任务。这个link 可以向您展示如何设置自定义上下文以及如何使用它。

    【讨论】:

    • 我读了这篇文章。但我不明白如何停止一个线程的实现并在某些事件后恢复它。
    猜你喜欢
    • 1970-01-01
    • 2011-05-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-06
    • 2017-12-08
    • 1970-01-01
    相关资源
    最近更新 更多