【问题标题】:Why Compensate method doesn't Call when a consumer thrown exception in MassTransit RouterSlip为什么当消费者在 MassTransit RouterSlip 中抛出异常时不调用 Compensate 方法
【发布时间】:2020-07-12 06:59:25
【问题描述】:

我已经在 saga 状态机中构建了一个路由表:

var builder = new RoutingSlipBuilder(NewId.NextGuid());
            var submitOrderUrl = QueueNames.GetActivityUri(nameof(SubmitOrderActivity));
            builder.AddActivity("SubmitOrder", submitOrderUrl, new
            {
                context.Message.OrderId
            });;
            builder.AddActivity("Payment", QueueNames.GetActivityUri(nameof(PaymentActivity)), new {
                context.Message.OrderId,
                context.Message.CustomerId,
                context.Message.Credit
            });
           
            builder.AddActivity("TakeProduct", QueueNames.GetActivityUri(nameof(TakeProductActivity)), new
            {
                context.Message.OrderId,
                Baskets
            });
            builder.AddVariable("OrderId", context.Message.OrderId);
            var routingSlip = builder.Build();
            await context.Execute(routingSlip);

我有 TakeProductActivity 活动: 公共类 TakeProductActivity : IActivity: ...

 public async Task<ExecutionResult> Execute(ExecuteContext<TakeProductArgument> context)
        {
            logger.LogInformation($"Take Product Courier called for order {context.Arguments.OrderId}");            
            var uri = QueueNames.GetMessageUri(nameof(TakeProductTransactionMessage));
            var sendEndpoint = await context.GetSendEndpoint(uri);
            await sendEndpoint.Send<TakeProductTransactionMessage>(new
            {
                ProductBaskets = context.Arguments.Baskets                
            });
             
            return context.Completed(new { Baskets = context.Arguments.Baskets, OrderId=context.Arguments.OrderId });
        }

当我使用 sendEndpoint.Send() 方法(触发并忘记)时,当服务中发生异常时,补偿方法不会自动激活, 但是当我使用 requestClient.GetResponse(request/reply) 方法调用服务时,当异常发生时自动调用了 Compensate 方法。 并且在 PaymentConsumer 中,当抛出异常时,必须为调用的付款方法进行补偿,但事实并非如此!

///this class has implemented in another micro-service hosted separate process:
public class TakeProductTransactionConsumer : IConsumer<TakeProductTransactionMessage>
....
 public async Task Consume(ConsumeContext<TakeProductTransactionMessage> context)
        {
            if(context.Message.ProductBaskets.Count>0)
             { 
                    throw new Exception("Process Failed!");
             }
            logger.LogInformation($"Take product called ");
         
            Dictionary<int, int> productCounts = new Dictionary<int, int>();
            foreach (var item in context.Message.ProductBaskets)
            {
                productCounts.Add(item.ProductId, item.Count);
            }
            var products = await productService.TakeProducts(productCounts);
            await publishEndpoint.Publish<ProductsUpdatedEvent>(new
            {
                ProductUpdatedEvents = products.Select(p =>new { ProductId = p.Id,p.Price,p.Count}).ToList()
            });
           
            
        }

问题是 MassTransit 无法从 rabbitMQ 获取异常并自动调用补偿方法。 当路由器滑动活动中抛出异常时,我应该如何告诉 MassTransit 调用补偿

【问题讨论】:

    标签: distributed masstransit saga routing-slip


    【解决方案1】:

    如果您的 Take Product Activity 使用 Send 对 Take Product 服务进行一劳永逸,并且该服务引发异常,则该 Activity 将永远不会知道它,因为它已经完成。即发即弃就是这样,在目标服务中没有观察到异常。

    如果你想让take product活动在take product服务抛出异常时失败,你需要使用request/response从服务中观察异常。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-30
      • 1970-01-01
      • 2019-04-16
      • 1970-01-01
      • 2022-10-14
      相关资源
      最近更新 更多