【问题标题】:How can I configure the topic name when using MassTransit SQS?使用 MassTransit SQS 时如何配置主题名称?
【发布时间】:2021-02-08 20:23:21
【问题描述】:

我使用 MassTransit SQS。我发现,当publishing a message时,会出现这个问题:

User is not authorized to perform: SNS:CreateTopic on resource

所以我需要创建主题的权限。

我通过要求管理员扩展权限解决了这个问题。可以创建此作品和主题。但是,似乎主题名称是根据类名自动生成的。我可以通过编程方式修改此行为吗(因此通过提供自我描述的主题名称)。

这对于创建 FIFO 主题很有用(其名称需要以 .fifo 结尾)。

更新

我现在熟悉formatting(感谢克里斯告诉我)和SetEntityName

但我的问题似乎是特定于 fifo 的。

这行得通:

                cfg.Message<CustomerUpdate>(x =>
                {
                    x.SetEntityName("customerupdate");
                });

但这不起作用:

                cfg.Message<CustomerUpdate>(x =>
                {
                    x.SetEntityName("customerupdate.fifo");
                });

它会导致异常并显示错误消息:

无效参数:主题名称

这种类型的:

Amazon.SimpleNotificationService.Model.InvalidParameterException

使用此堆栈跟踪:

Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.<HandleExceptionAsync>d__2.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ExceptionHandler`1.<HandleAsync>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.ErrorHandler.<ProcessExceptionAsync>d__8.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CredentialsRetriever.<InvokeAsync>d__7`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorCallbackHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.MetricsHandler.<InvokeAsync>d__1`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.TopicCache.<CreateMissingTopic>d__9.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.TopicCache.<>c__DisplayClass6_0.<<Get>b__0>d.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Caching.Internals.PendingValue`2.<CreateValue>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<Declare>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<ConfigureTopology>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>b__0>d.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.PipeExtensions.<OneTimeSetup>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>d__3.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.TaskAwaiter.GetResult()<PublishAsync>d__2.MoveNext() in Publisher.cs: line: 18

这是我的代码

    public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration)
    {
        services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomerChangeConsumer>();
            x.UsingAmazonSqs((context, cfg) =>
            {
                cfg.Host(massTransitConfiguration.Host, h =>
                {
                    h.AccessKey(massTransitConfiguration.AccessKey);
                    h.SecretKey(massTransitConfiguration.SecretKey);

                    h.EnableScopedTopics();
                });

                cfg.ReceiveEndpoint("CustomerChangeConsumer",
                    configurator =>
                    {
                        configurator.ConfigureConsumer<CustomerChangeConsumer>(context);
                    });

                cfg.Message<CustomerUpdate>(x =>
                {
                    // This causes the exception when publishing:
                    x.SetEntityName("customerupdate.fifo");
                    // But this works. No issues when publishing
                    //x.SetEntityName("customerupdate");
                });
            });
        });
        
        services.AddMassTransitHostedService();
    }
}

【问题讨论】:

    标签: amazon-sqs masstransit


    【解决方案1】:

    您可以configure the message topology指定实体名称格式。

    例如,更改 OrderSubmitted 事件的主题名称:

    Bus.Factory.CreateUsingAmazonSqs(cfg =>
    {
        cfg.Message<OrderSubmitted>(x =>
        {
            x.SetEntityName("order-submitted.fifo");
        });
    
        cfg.Publish<OrderSubmitted>(x =>
        {
            x.TopicAttributes["FifoTopic"] = "true";
        });
    });
    

    要在发布时设置MessageGroupId,请使用:

    await bus.Publish<OrderSubmitted>(message, x => x.SetGroupId("..."));
    

    我不能 100% 确定在 Publish 和 Send 中能否幸存。

    【讨论】:

    • 谢谢,但这对我来说不是一个完整的解决方案,因为名称需要以“.fifo”结尾。使用 fifo 主题时,名称中的 fifo 扩展名是必需的。对于其他主题,您建议的解决方案可以正常工作。如果我使用您的解决方案,我会在尝试发布消息时收到此错误消息:Invalid parameter: Topic Name。你也知道 fifo 主题的解决方案吗?
    • 你就不能SetEntityName("order-submitted.fifo")吗?您还可以使用cfg.Publish&lt;OrderSubmitted&gt;(x =&gt; ....)为主题属性等配置主题。
    • 感谢您告诉我,但SetEntityName 真的不适合我。我更新了我的问题以澄清更多。我不确定在这种情况下如何使用cfg.Publish&lt;OrderSubmitted&gt;(x =&gt; ....),因为 x 参数在那里没有SetEntityName 方法。
    • 添加发布拓扑以添加基于阅读 SNS 文档的属性。
    • 谢谢!现在我收到一条新的错误消息,这通常意味着我离解决问题越来越近了:Invalid parameter: The MessageGroupId parameter is required for FIFO topics。你知道在哪里设置MessageGroupId吗?将其设置为主题属性并不能解决问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-08
    • 1970-01-01
    • 2016-03-31
    • 2016-11-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多