【问题标题】:How to determine that TransactionScope has called all commits?如何确定 TransactionScope 已调用所有提交?
【发布时间】:2014-02-15 18:04:44
【问题描述】:

EDITED似乎在使用分布式事务(EnterpriseServicesInteropOption.Full)和持久订阅者时,TransactionScope.Dispose 方法不会等待所有提交完成,但只是将方法调用和 TransactionCompleted 事件生成到后台线程。

这不符合明确说明的文档:

此方法是同步的,并且在事务提交或中止之前一直阻塞。

更糟糕的是,似乎没有办法确定所有提交的处理时间。这是有问题的,因为在控制台应用程序中,主线程可以在 dispose 后退出,这有效地杀死了所有后台线程。分布式事务中的远程参与者永远不会被告知这一点,导致锁保持打开、超时和其他丑陋的事情......

另一个问题是,当创建新的 TransactionScope 时,参与者仍然可以与旧事务相关联,而他们应该在新事务中注册。

下面的(简化的)代码演示了这个问题。

我的问题:是否有人知道如何确定开始一个新循环是否安全(还)?我无权访问 Worker 的代码,所以我无法更改其中的任何内容...添加 Thread.Sleep(1000) 可以解决问题,但这会降低性能...

已编辑

internal class TransactionScopeTest
{
    [STAThread]
    public static void Main()
    {
        var transactionOptions = new TransactionOptions { Timeout = TransactionManager.DefaultTimeout };
        var worker = new Worker();
        var transactionCompletedEvent = new AutoResetEvent(true); // true to start a first loop

        while (true)
        {
            transactionCompletedEvent.WaitOne(); // wait for previous transaction to finish

            Log("Before TransactionScope");
            using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions, EnterpriseServicesInteropOption.Full))
            {
                Log("Inside TransactionScope");
                Transaction.Current.TransactionCompleted += delegate
                {
                    transactionCompletedEvent.Set(); // allow a next loop to start
                    Log("TransactionCompleted event");
                };
                worker.DoWork();
                Log("Before commit");
                tx.Complete();
                Log("Before dispose");
            }
            Log("After dispose");
        }
    }

    private static void Log(string message)
    {
        Console.WriteLine("{0} ({1})", message, Thread.CurrentThread.ManagedThreadId);
    }


    public class Worker : IEnlistmentNotification
    {
        private Transaction _transaction;
        private readonly Guid _id = Guid.NewGuid();

        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            Log("Preparing");
            preparingEnlistment.Prepared();
        }

        public void Commit(Enlistment enlistment)
        {
            Log("Committing");
            _transaction = null;
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            Log("Rolling back");
            _transaction = null;
            enlistment.Done();
        }

        public void InDoubt(Enlistment enlistment)
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + "Doubting");
            _transaction = null;
            enlistment.Done();
        }

        public void DoWork()
        {
            Enlist();
            Log("Doing my thing...");
        }

        private void Enlist()
        {
            if (_transaction == null) //Not yet enlisted
            {
                Log("Enlisting in transaction");
                _transaction = Transaction.Current;
                _transaction.EnlistDurable(_id,this, EnlistmentOptions.EnlistDuringPrepareRequired);
                return;
            }
            if (_transaction == Transaction.Current) //Already enlisted in current transaction
            {
                return;
            }
            throw new InvalidOperationException("Already enlisted in other transaction");
        }
    }
}

输出:

Before commit (1)
Before dispose (1)
Preparing (6)
After dispose (1)
Committing (6)
TransactionCompleted event (7)
Before TransactionScope (1)
Inside TransactionScope (1)
Enlisting in transaction (1)
Doing my thing... (1)
Before commit (1)
Before dispose (1)
Preparing (7)
After dispose (1)
Before TransactionScope (1)
TransactionCompleted event (7)
Inside TransactionScope (1)
Committing (6)

Unhandled Exception: System.InvalidOperationException: Already enlisted in other transaction

【问题讨论】:

  • 你是怎么解决这个问题的?我也有同样的问题,也在使用 IBM MQ。
  • 我添加了我实现的代码作为答案。这绝不是一个明确的解决方案,但它是我能找到的最好的解决方案。

标签: c# .net transactions transactionscope distributed-transactions


【解决方案1】:

Transaction.Current.TransactionCompleted 总是在 worker.Commit 通知之后执行。添加一个 AutoResetEvent 来跟踪 TransactionCompleted 并在开始新循环之前等待它:

var transactionCompletedEvent = new AutoResetEvent(true); // true to start a first loop

while (true)
{
    transactionCompletedEvent.WaitOne(); // wait for previous transaction to finish

    Log("Before TransactionScope");
    using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions, EnterpriseServicesInteropOption.Full))
    {
        Log("Inside TransactionScope");
        Transaction.Current.TransactionCompleted += delegate 
        {
            transactionCompletedEvent.Set(); // allow a next loop to start
            Log("TransactionCompleted event"); 
        };
        worker.DoWork();
        Log("Before commit");
        tx.Complete();
        Log("Before dispose");
    }
    Log("After dispose");
}

【讨论】:

  • 你是对的。这确实解决了示例的问题,但是您的解决方案对我的问题程序没有帮助,其中“工作人员”是 IBM Websphere MQ 对象。所以我对此进行了进一步调查,我能发现的唯一区别是 MQ 将自身注册为 durable。这似乎会影响 TransactionScope 的行为,因为“TransactionCompleted”事件现在似乎在与提交不同的线程上引发,并且有时在实际调用提交之前收到。我在第一篇文章中更改了示例以证明这一点。
  • 您应该为不同的事务使用不同的 MQ 对象。持久登记意味着提交通知可能在几分钟或几小时内都不会到达。详情见 MSDN (msdn.microsoft.com/en-us/library/ms229975(v=vs.110).aspx): ...在此期间,资源管理器对事务的结果存在疑问。它不知道事务是提交还是中止。当资源管理器对事务有疑问时,它通过保持事务锁定来保持数据修改,从而将这些更改与任何其他事务隔离。
  • 好的,我可以同意。在每个事务中重新创建 MQ 连接会对性能造成巨大影响,但如果这样可以提高稳定性,我愿意接受。
  • 然而,一个问题仍然存在。我正在谈论的有问题的程序计划每 15 分钟运行一次。它应该处理某个队列中的所有消息,然后停止。问题仍然存在,如果主线程停止,所有排队的提交调用也会停止。更糟糕的是,一些提交甚至可能还没有被调用。您可以在上面的示例输出中看到:最后一次提交在再次在循环中运行时被调用。据我所知,没有办法检查它们是否被调用。
【解决方案2】:

或多或少有效的唯一解决方案是等到所有后台线程停止后再退出控制台应用程序。

在退出应用程序之前,我通过调用以下代码实现了这一点:

public static class ThreadTools
{ 
    /// <summary>
    /// Wait until all worker threads have finished their job.
    /// </summary>
    public static void WaitForBackgroundThreads()
    { 
        int workerThreads = 0;
        int completionPortThreads = 0;
        int maxWorkerThreads;
        int maxCompletionPortThreads;
        ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads); 
        while(workerThreads != maxWorkerThreads || completionPortThreads != maxCompletionPortThreads)
        { 
            Thread.Sleep(100);
            ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
        }
    }
}

我意识到这只是一个 hack,但在有人给我更好的解决方案之前,这是我能想到的最佳答案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-13
    • 1970-01-01
    • 2018-10-20
    • 1970-01-01
    相关资源
    最近更新 更多