【问题标题】:Middleware with Masstransit publish带有 Masstransit 发布的中间件
【发布时间】:2019-10-09 06:11:47
【问题描述】:

我有带有MassTransit 的 .net 核心 WEB API 应用程序(用于实现 RabbitMQ 消息代理)。 RabbitMQ-MassTransit 配置很简单,只需几行代码即可在Startup.cs 文件中完成。

services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomLogConsume>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ExchangeType = ExchangeType.Fanout;

                cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
                {
                    e.PrefetchCount = 16;
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));
        });

我在我的项目解决方案中使用依赖注入以获得更好的代码标准。发布消息适用于控制器依赖注入。但是当我为日志操作实现自定义middle ware 时,Masstransit 无法正确发布消息,它在 RabbitMQ Web 控制台中创建了一个带有_error 的附加队列。

public class RequestResponseLoggingMiddleware
{
    #region Private Variables

    /// <summary>
    /// RequestDelegate
    /// </summary>
    private readonly RequestDelegate _next;

    /// <summary>
    /// IActionLogPublish
    /// </summary>
    private readonly IActionLogPublish _logPublish;

    #endregion

    #region Constructor
    public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
    {
        _next = next;
        _logPublish = logPublish;
    }
    #endregion

    #region PrivateMethods

    #region FormatRequest
    /// <summary>
    /// FormatRequest
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    private async Task<ActionLog> FormatRequest(HttpRequest request)
    {
        ActionLog actionLog = new ActionLog();
        var body = request.Body;
        request.EnableRewind();

        var context = request.HttpContext;

        var buffer = new byte[Convert.ToInt32(request.ContentLength)];
        await request.Body.ReadAsync(buffer, 0, buffer.Length);
        var bodyAsText = Encoding.UTF8.GetString(buffer);
        request.Body = body;

        var injectedRequestStream = new MemoryStream();

        var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";

        using (var bodyReader = new StreamReader(context.Request.Body))
        {
            bodyAsText = bodyReader.ReadToEnd();

            if (string.IsNullOrWhiteSpace(bodyAsText) == false)
            {
                requestLog += $", Body : {bodyAsText}";
            }

            var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
            injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
            injectedRequestStream.Seek(0, SeekOrigin.Begin);
            context.Request.Body = injectedRequestStream;
        }

        actionLog.Request = $"{bodyAsText}";
        actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";

        return actionLog;
    }
    #endregion

    #region FormatResponse
    private async Task<string> FormatResponse(HttpResponse response)
    {
        response.Body.Seek(0, SeekOrigin.Begin);
        var text = await new StreamReader(response.Body).ReadToEndAsync();
        response.Body.Seek(0, SeekOrigin.Begin);

        return $"Response {text}";
    }
    #endregion

    #endregion

    #region PublicMethods

    #region Invoke
    /// <summary>
    /// Invoke - Hits before executing any action. Actions call executes from _next(context)
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async Task Invoke(HttpContext context)
    {
        ActionLog actionLog = new ActionLog();

        actionLog = await FormatRequest(context.Request);


        var originalBodyStream = context.Response.Body;

        using (var responseBody = new MemoryStream())
        {
            context.Response.Body = responseBody;

            await _next(context);

            actionLog.Response = await FormatResponse(context.Response);

            await _logPublish.Publish(actionLog);
            await responseBody.CopyToAsync(originalBodyStream);
        }
    }
    #endregion

    #endregion
}

在启动时配置中间件

  public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ............
        app.UseMiddleware<RequestResponseLoggingMiddleware>();
        ....................
    }

MassTransit 在启动时是否有任何其他配置可以与 Middle Ware 一起使用

编辑

IActionLogPublish

public interface IActionLogPublish
{
    Task Publish(ActionLog model);
}

ActionLogPublish

public class ActionLogPublish : IActionLogPublish
{

    private readonly IBus _bus;

    public ActionLogPublish(IBus bus)
    {
        _bus = bus;
    }

    public async Task Publish(ActionLog actionLogData)
    {
        /* Publish values to RabbitMQ Service Bus */

        await _bus.Publish(actionLogData);

        /* Publish values to RabbitMQ Service Bus */
    }

}

编辑

RabbitMQ Web 控制台

【问题讨论】:

  • 一些问题......日志中的任何类型的错误,或任何被推送到 _error 队列的错误。您还可以分享 IActionLogPublish 的代码吗
  • 您到底遇到了什么错误?尝试在捕获所有异常的情况下运行它..
  • 另外,IOC容器如何解析delegate_next?我假设中间件的生命周期也由 IOC 容器管理

标签: c# asp.net asp.net-mvc asp.net-core rabbitmq


【解决方案1】:

中间件需要将原始正文放回响应中。

注入的依赖也可以在控制器而不是中间件上正常工作,因为它可能在作用域生命周期中注册。

在这种情况下,它不应该是构造函数注入到 middlewre 而是直接注入到Invoke

因为中间件是在应用启动时构建的,而不是按请求构建的,所以中间件构造函数使用的 范围 生命周期服务不会在每个请求期间与其他依赖注入类型共享。如果您必须在中间件和其他类型之间共享范围服务,请将这些服务添加到 Invoke 方法的签名中。 Invoke 方法可以接受由 DI 填充的附加参数:

//...omitted for brevity

public RequestResponseLoggingMiddleware(RequestDelegate next) {
    _next = next;
}

//...

private async Task<string> FormatResponseStream(Stream stream) {
    stream.Seek(0, SeekOrigin.Begin);
    var text = await new StreamReader(stream).ReadToEndAsync();
    stream.Seek(0, SeekOrigin.Begin);
    return $"Response {text}";
}

public async Task Invoke(HttpContext context, IActionLogPublish logger) {
    ActionLog actionLog = await FormatRequest(context.Request);
    //keep local copy of response stream
    var originalBodyStream = context.Response.Body;

    using (var responseBody = new MemoryStream()) {
        //replace stream for down stream calls
        context.Response.Body = responseBody;

        await _next(context);

        //put original stream back in the response object
        context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT

        //Copy local stream to original stream
        responseBody.Position = 0;
        await responseBody.CopyToAsync(originalBodyStream);

        //custom logging
        actionLog.Response = await FormatResponse(responseBody);
        await logger.Publish(actionLog);
    }
}

参考Dependency injection in ASP.NET Core: Scoped Service lifetime

在中间件中使用作用域服务时,将服务注入InvokeInvokeAsync 方法。 不要通过构造函数注入进行注入,因为它会强制服务表现得像单例。如需更多信息,请参阅Write custom ASP.NET Core middleware

强调我的

【讨论】:

    【解决方案2】:

    很难从描述中看出您到底遇到了什么错误。中间件实现看起来很复杂,它可能是错误的根源。我猜你没有正确设置流位置或其他东西。 @Nkosi 的更正实际上可能会修复它。

    如果您说 IBus 可以在根据请求创建的控制器中正常工作,您可能需要尝试在中间件中实现 IMiddleware 接口,如 doc 中所述。

    public class RequestResponseLoggingMiddleware : IMiddleware
    {
        IActionLogPublish logPublish;
    
        public RequestResponseLoggingMiddleware(IActionLogPublish logPublish)
        {
            this.logPublish = logPublish;
        }
    
        // ...
    
        public async Task InvokeAsync(HttpContext context, RequestDelegate next)
        {
            //...
        }
    
        //...
    }
    

    在这种情况下,中间件将注册为作用域或临时服务,并为每个请求解析,与控制器相同。如果它与范围服务解决方案有关,这也可以解决您的问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多