【问题标题】:NServiceBus with AzureStorageQueues not removing poison messages from input queue due to changing message properties由于更改消息属性,带有 AzureStorageQueues 的 NServiceBus 不会从输入队列中删除有害消息
【发布时间】:2020-04-22 13:35:39
【问题描述】:

我正在尝试一个新的 NServiceBus 项目,该项目利用 Azure 存储队列进行消息传输和 JSON 序列化,使用此处看到的自定义消息展开逻辑:

            var jsonSerializer = new Newtonsoft.Json.JsonSerializer();
            transportExtensions.UnwrapMessagesWith(cloudQueueMessage =>
            {
                using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
                using (var streamReader = new StreamReader(stream))
                using (var textReader = new JsonTextReader(streamReader))
                {
                    try
                    {
                        var jObject = JObject.Load(textReader);

                        using (var jsonReader = jObject.CreateReader())
                        {
                            // Try deserialize to a NServiceBus envelope first
                            var wrapper = jsonSerializer.Deserialize<MessageWrapper>(jsonReader);

                            if (wrapper.MessageIntent != default)
                            {
                                // This was a envelope message
                                return wrapper;
                            }
                        }

                        // Otherwise this was an EventGrid event
                        using (var jsonReader = jObject.CreateReader())
                        {
                            var @event = jsonSerializer.Deserialize<EventGridEvent>(jsonReader);

                            var wrapper = new MessageWrapper
                            {
                                Id = @event.Id,
                                Headers = new Dictionary<string, string>
                            {
                                { "NServiceBus.EnclosedMessageTypes", @event.EventType },
                                { "NServiceBus.MessageIntent", "Publish" },
                                { "EventGrid.topic", @event.Topic },
                                { "EventGrid.subject", @event.Subject },
                                { "EventGrid.eventTime", @event.EventTime.ToString("u") },
                                { "EventGrid.dataVersion", @event.DataVersion },
                                { "EventGrid.metadataVersion", @event.MetadataVersion },
                            },
                                Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event.Data)),
                                MessageIntent = MessageIntentEnum.Publish
                            };

                            return wrapper;
                        }
                    }
                    catch
                    {
                        logger.Error("Message deserialization failed, sending message to error queue");
                        throw;
                    }
                }
            });

自定义消息解包逻辑适用于格式正确的 JSON 消息,并且当将格式不正确的 JSON 消息放入输入队列时,自定义消息解包逻辑将在我创建 jObject 的 usings 内的第一行出错,这是预期的行为。但是,当自定义消息展开逻辑失败时,错误将被 MessageRetrieved 类中的逻辑捕获,该类是 NServiceBus.Azure.Transports.WindowsAzureStorageQueues NuGet 包 (v8.2.0) 的一部分,如下所示:

        public async Task<MessageWrapper> Unwrap()
        {
            try
            {
                Logger.DebugFormat("Unwrapping message with native ID: '{0}'", rawMessage.Id);
                return unwrapper.Unwrap(rawMessage);
            }
            catch (Exception ex)
            {
                await errorQueue.AddMessageAsync(rawMessage).ConfigureAwait(false);
                await inputQueue.DeleteMessageAsync(rawMessage).ConfigureAwait(false);

                throw new SerializationException($"Failed to deserialize message envelope for message with id {rawMessage.Id}. Make sure the configured serializer is used across all endpoints or configure the message wrapper serializer for this endpoint using the `SerializeMessageWrapperWith` extension on the transport configuration. Please refer to the Azure Storage Queue Transport configuration documentation for more details.", ex);
            }
        }

try catch 的第一行运行正确,将消息添加到配置的错误队列中,但是,当它这样做时,它似乎正在更改原始消息的消息 ID 和 popreceipt,如下所示:

Initial Message Values

Updated Message Values

然后,当下一行运行尝试从输入队列中删除原始消息时,它无法找到它,因为根据这篇文章 https://docs.microsoft.com/en-us/rest/api/storageservices/delete-message2#remarks 它需要原始消息 ID 和 pop 收据,它们现在已更改,导致以下内容抛出错误:

2020-04-20 14:17:58,603 WARN : Azure Storage Queue transport failed pushing a message through pipeline
Type: Microsoft.WindowsAzure.Storage.StorageException
Message: The remote server returned an error: (404) Not Found.
Source: Microsoft.WindowsAzure.Storage
StackTrace:
   at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs:line 50
   at Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass7.<CreateCallbackVoid>b__5(IAsyncResult ar) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:line 121
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessageRetrieved.<Unwrap>d__3.MoveNext() in C:\BuildAgent\work\3c19e2a032c05076\src\Transport\MessageRetrieved.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext() in C:\BuildAgent\work\3c19e2a032c05076\src\Transport\MessagePump.cs:line 153
TargetSite: T EndExecuteAsync[T](System.IAsyncResult)

这是 NServiceBus 包逻辑的问题,还是我的自定义消息展开逻辑中的某些内容导致这些值发生变化?

【问题讨论】:

  • 正在以 1 的并发运行并确定它是相同的消息吗?尽管我使用了 8.0 版本的传输,但我不记得在摄取 EventGrid 事件时有任何问题。看起来你正在这样做。
  • 顺便说一句,rawMessage 的类型为 CloudQueueMessage。它不会更改消息 ID 或弹出回执。如果您有一个 GitHub 存储库,可以在其中重现此内容,请分享链接。
  • @SeanFeldman 不幸的是,我没有指向公共存储库的链接,但是在今天进行了更多挖掘之后,我注意到消息 id 和弹出回执的更新消息值与新消息的值匹配被添加到错误队列中,看起来好像在调用 CloudQueue.AddMessageAsync 时它正在替换传入的原始消息中的那些值。Microsoft 文档似乎与此匹配 - link
  • @SeanFeldman 如果是这种情况,那么它将解释为什么输入队列之后无法删除原始消息,因为它不再具有要匹配的原始消息的信息。

标签: c# azure nservicebus azure-storage-queues


【解决方案1】:

这是一个错误。当解包失败时,消息尚未通过处理管道。因此,正常可恢复性不适用。 CloudQueueMessage 需要被“克隆”并且克隆被发送到错误队列,而原始消息用于将其从输入队列中删除。我在 GitHub 上提出了bug issue,你可以在那里跟踪这个过程。

【讨论】:

    猜你喜欢
    • 2016-08-25
    • 1970-01-01
    • 2012-06-29
    • 1970-01-01
    • 2016-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-10-01
    相关资源
    最近更新 更多