【问题标题】:Masstransit error handling for batch consumer批量消费者的 Masstransit 错误处理
【发布时间】:2021-09-22 18:16:03
【问题描述】:

我已经配置了一个消费者来接收一批消息。消费者也已配置为在发生任何错误时重试和重新交付。批处理消费者工作正常。但是我注意到,如果批处理消息中发生任何错误,整个批处理都会出错。

如下例所示,假设 Message[2] 出现故障,那么看起来整个批次在出现故障之前正在重试/重新交付。

我的问题:我们有什么方法可以配置消费者,以便只有批处理中的错误消息尝试重新传递或出现错误,并且批处理中的其他消息将恢复。 p>

public class MyConsumer : IConsumer<Batch<MyClass>>, IConsumer<Fault<MyClass>>
{
    public async Task Consume(ConsumeContext<Batch<MyClass>> context)
    {
        for (int i = 0; i < context.Message.Length; i++)
        {
            if (i == 2)
            {
                throw new Exception();
            }
        }
    }
    public async Task Consume(ConsumeContext<Fault<MyClass>> context)
    {
        Console.WriteLine($"Error in Context. Name :{context.Message.Message.Name}");
    }
}

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    如果配置正确,整个批次将作为一个单元重试,这意味着您需要以另一种方式跟踪单个项目。一种方法是让每个项目成为一个单独的幂等操作,但此时真正的问题变成了“为什么要使用批处理进行离散操作?”

    如果您要批量处理每条消息,只需使用普通消费者即可,省去复杂性和麻烦。

    如果您出于某种原因必须使用批处理,请考虑捕获单个消息的异常并发布某种类型的事件,表明单个项目失败。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-05-06
      • 1970-01-01
      • 2023-01-10
      • 1970-01-01
      • 1970-01-01
      • 2021-12-02
      • 2021-04-24
      相关资源
      最近更新 更多