【问题标题】:Service Broker External Activation Runs App Multiple TimesService Broker 外部激活多次运行应用程序
【发布时间】:2011-01-14 12:01:47
【问题描述】:

我使用服务代理作为我的消息传递系统来安排和运行作业。 Eash 作业由称为引擎的多个任务或步骤组成。

我的服务代理对象是:

  • 消息类型:SubmitJob、JobResponse、SubmitTask、TaskResponse
  • 合同:JobContract、TaskContract
  • 队列:ClientQueue、JobQueue、EngineQueue、ExternalActivatorQueue
  • 服务:ClientService、JobService、EngineService、ExternalActivatorService
  • 事件通知:EventNotificationEngineQueue

我在作业队列上有内部激活(存储过程)。对于 SubmitJob MessageTypes,存储过程获取该作业的第一个任务,与 EngineService 启动一个对话并向该队列(StartTask)发送一条消息那么它们是否会被提交到 EngineQueue,如果没有,则此作业的任务已完成(发送消息并清理。)

这一切似乎都很好。但是,我想要一个处理 EngineQueue 消息的外部应用程序(引擎)。所以我使用的是微软的外部激活机制(ssbeas.exe)。花了很长时间,但我终于让它工作了。一条消息进入 EngineQueue,EventNotificationEngineQueue 启动我的应用程序并排空队列。到目前为止,一切都很好。但是,我的应用程序似乎运行了多次。我的测试应用程序配置为在完成时发送电子邮件。即使我只发送一项任务和一项任务,我也会收到多封电子邮件(表明程序运行了多次。)

这是我的应用程序 (vb.net) 的代码(broker 是一个封装服务代理服务(发送、接收等)的对象:

While True

            oBroker.tran = oBroker.cnn.BeginTransaction

            oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)

            If dialogHandle = System.Guid.Empty Then
                'Console.WriteLine("An Error Occurred. Program Terminated.")
                oBroker.tran.Commit()
                Exit While
            End If

            ConsoleWriteLine("Received: " & msgType)

            If (msg Is Nothing) Then
                ConsoleWriteLine("commiting and exiting")
                oBroker.tran.Commit()
                Exit While

            Else
                Select Case (msgType)
                    Case "SubmitTask"
                        ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
                        oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
                        oBroker.EndDialog(dialogHandle)

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
                        oBroker.EndDialog(dialogHandle)

                End Select
            End If


            ConsoleWriteLine("commiting...")
            oBroker.tran.Commit()

        End While

我不明白为什么应用程序运行多次,但除此之外,我不明白为什么后续版本仍然能够看到队列中的消息。毕竟,第一个化身应该已经将消息锁定在队列中。它确实锁定了队列,因为我能够使用查询管理器进行测试,以在我的应用程序运行时尝试接收消息,但它被阻止了。

我尝试过使用 EAService.config 中的并发值。当我将其设置为 min="0" 和 max="1" 我确实将应用程序运行的次数减少到了两次以前,使用 min="0" 和 max="10",它运行起来似乎是 18 个副本。

我希望这是有道理的,对长度感到抱歉。有人知道这里发生了什么吗?我的 .net 应用程序编码有误吗?

谢谢 马丁

编辑:添加引擎运行后创建的日志:

2010-02-08 09:31:39 - 主要
2010-02-08 09:31:39 - 收到:提交任务
2010-02-08 09:31:39 - ProcessMsg
2010-02-08 09:31:39 - &lt;Task&gt;&lt;TaskID&gt; 5&lt;/TaskID&gt;&lt;TaskKey&gt;2&lt;/TaskKey&gt;&lt;/Task&gt;
2010-02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - 发送电子邮件
2010-02-08 09:31:40 - 提交...
2010-02-08 09:31:40 - 睡觉
2010-02-08 09:32:10 - 睡眠完成。
2010-02-08 09:32:10 - 主要完成
2010-02-08 09:32:10 - 外部激活的应用程序成功并立即终止。
2010-02-08 09:32:10 - 主要
2010-02-08 09:32:10 - 收到:提交任务
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - &lt;Task&gt;&lt;TaskID&gt; 5&lt;/TaskID&gt;&lt;TaskKey&gt; 2&lt;/TaskKey&gt;&lt;/Task&gt;
2010-02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - 发送电子邮件
2010-02-08 09:32:10 - 提交...
2010-02-08 09:32:10 - 睡觉
2010-02-08 09:32:40 - 睡眠完成。
2010-02-08 09:32:40 - 主要完成
2010-02-08 09:32:40 - 外部激活的应用程序成功并立即终止。

你可以看到它通过了整个应用程序两次(主要,接收,dowork,sendemail,完成。)

编辑 2:这是在将作业提交到队列时激活的存储过程(调试语句和所有)的最新版本:

ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN

    DECLARE     @message_type_name      sysname
    DECLARE     @dialog             uniqueidentifier 
    DECLARE     @message_sequence_number    bigint
    DECLARE     @error_message_sequence_number  bigint
    DECLARE     @message_body           xml
    DECLARE     @cgid               uniqueidentifier
    DECLARE     @JobID              int
    DECLARE     @Params             varchar(MAX)

    DECLARE     @ErrorNumber            bigint
    DECLARE     @ErrorText          nvarchar(MAX)

    DECLARE         @TaskID             int 
    DECLARE     @TaskService            varchar(100)
    DECLARE     @TaskKey            int
    DECLARE     @chEngine           uniqueidentifier
    DECLARE     @Step               int
    DECLARE     @NextStep           int
    DECLARE     @jobch              uniqueidentifier
    DECLARE     @EngineMsg          XML
    DECLARE     @TimeStarted            datetime
    DECLARE     @TaskStatus         int

    --  This procedure will just sit in a loop processing Task messages in the queue
    --  until the queue is empty
    SET NOCOUNT ON
    SET @error_message_sequence_number = -100


    PRINT 'pr_ProcessJob: Start'

    WHILE (1=1) BEGIN

        BEGIN TRY

            PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
            BEGIN TRANSACTION

            -- first lets get the conversation group id for the next message.
            WAITFOR (
                GET CONVERSATION GROUP @cgid FROM [JobQueue]
            ), TIMEOUT 1000

            IF (@@ROWCOUNT = 0) BEGIN

                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
                ROLLBACK TRANSACTION
                BREAK
            END

            PRINT @CGID

            -- Inner Loop (Message Processing)
            WHILE (1=1) BEGIN

                -- Receive the next available message
                PRINT 'Receiving Message.'
                WAITFOR (
                    RECEIVE top(1) -- just handle one message at a time
                        @message_type_name=message_type_name,  --the type of message received
                        @message_body=CAST(message_body AS XML),      -- the message contents
                        @message_sequence_number=message_sequence_number,
                        @dialog = conversation_handle    -- the identifier of the dialog this message was received on
                        FROM [JobQueue]
                        WHERE conversation_group_id=@cgid
                ), TIMEOUT 3000  -- if the queue is empty for three seconds, give up and go away

                -- If we didn't get anything, the queue is empty so bail out

                IF (@@ROWCOUNT = 0) BEGIN               
                    PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
                    BREAK
                END --IF (@@ROWCOUNT = 0)

                PRINT 'Message Received: ' + @message_type_name
                SAVE TRANSACTION MessageReceivedSavePoint

                -- Handle the End Conversation Message
                IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                    -- When we receive an End Dialog, we need to end also.
                    PRINT 'ENDING CONVERSATION'
                    END CONVERSATION @dialog
                END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                ELSE BEGIN
                    -- Handle the Conversation Error Message
                    IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
                        -- We can't return anything here because the dialog at the other end is closed so just log 
                        -- an error and close our end of the conversation.

                        PRINT 'ENDING CONVERSATION w/Error'
                        END CONVERSATION @dialog
                    END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                    ELSE BEGIN
                        IF (@message_type_name = 'SubmitJob')  BEGIN -- Process normal Job messages..   

                            PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'

                            -- Pull the information out of the task message with XQuery
                            SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
                                @Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')

                            PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
                            PRINT 'pr_ProcessJob::@Params = ' + @Params


                            SELECT  @ErrorNumber = 0, @ErrorText = N''

                            -- Do something with the job
                            -- save state

                            -- we are looking for the first step 
                            SET @Step=1

                            PRINT 'Selecting from JobTask'
                            ---------------------------------------------------------
                            -- Get the next task
                            ---------------------------------------------------------
                            SELECT  TOP 1
                                @TaskID=task.TaskID, 
                                @TaskService=tt.TaskService, 
                                @TaskKey =Task.TaskKey
                            FROM JobTask task INNER JOIN TaskType tt
                                ON task.TaskTypeID = tt.TaskTypeID      
                            WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
                            ORDER BY Task.step
                            ---------------------------------------------------------
                            PRINT 'Selecting from JobTask: complete'

                            PRINT 'Step='+cast(@step as varchar(max))               
                            PRINT 'TaskID='+cast(@TaskID as varchar(max))
                            PRINT 'TaskService='+cast(@TaskService as varchar(max))
                            PRINT'TaskKey='+cast(@TaskKey as varchar(max))                      

                            PRINT 'BEGIN DIALOG with ' + @TaskService   

                            BEGIN DIALOG @chEngine
                                FROM SERVICE [JobService]
                                TO SERVICE @TaskService
                                ON CONTRACT [TaskContract]
                                WITH RELATED_CONVERSATION=@dialog;

                            PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'

                            SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                            PRINT CAST(@EngineMsg as varchar(max))

                            PRINT 'Sending Message Type SubmitTask to Engine.';

                            SEND ON CONVERSATION @chEngine
                                MESSAGE TYPE SubmitTask
                                (@EngineMsg)

                            PRINT 'Inserting into jobstate'

                            INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)

                        END -- IF (@message_type_name = 'SubmitJob') 
                        ELSE BEGIN

                            IF (@message_type_name = 'TaskResponse')  BEGIN 

                                PRINT 'Processing MessageType TaskResponse'

                                SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')

                                PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))

                                PRINT 'Loading State'

                                --LoadState
                                SELECT  @JobID=jobid, 
                                    @Step=Step, 
                                    @jobch=jobch,
                                    @TimeStarted=sysdate    
                                FROM Jobstate
                                WHERE cgid=@cgid

                                PRINT 'Loading State complete' 

                                PRINT @jobch

                                PRINT 'Selecting from JobTask'
                                ---------------------------------------------------------
                                -- Get the next task
                                ---------------------------------------------------------
                                SELECT  TOP 1
                                    @TaskID=task.TaskID, 
                                    @TaskService=tt.TaskService, 
                                    @TaskKey =task.TaskKey,
                                    @NextStep = task.Step

                                FROM JobTask task INNER JOIN TaskType tt
                                    ON task.TaskTypeID = tt.TaskTypeID      
                                WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
                                ORDER BY Task.step
                                ---------------------------------------------------------
                                PRINT 'Selecting from JobTask: complete'

                                PRINT 'NextTask: ['+@TaskService+']'

                                if (@TaskService is null) BEGIN

                                    PRINT '@TaskService is NULL: BEGIN'

                                    -- no more tasks
                                    --END CONVERSATION @jobch

                                    PRINT 'Removing from state table'                               
                                    DELETE FROM JobState 
                                    WHERE @cgid=cgid
                                    PRINT @@ROWCOUNT
                                    PRINT 'Removing from state table-completed'

                                    DECLARE @ResponseDoc xml

                                    -- Send a response message saying we're done
                                    DECLARE @Time nvarchar(100)                 
                                    SET @Time = cast(getdate() as nvarchar(100))

                                    DECLARE @TimeStartedText nvarchar(100)
                                    SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))

                                    SET @ResponseDoc = N'<Job/>'
                                    SET @ResponseDoc.modify(
                                    'insert (<JobID>{ sql:variable("@JobID") }</JobID>, 
                                    <JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
                                    <ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
                                    <ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
                                    <TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>, 
                                    <TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>) 
                                    as last into /Job [1]'); 

                                    SEND ON CONVERSATION @jobch 
                                        MESSAGE TYPE [JobResponse] (@ResponseDoc)

                                    END CONVERSATION @jobch             

                                    PRINT '@TaskService is NULL: END'

                                END --if (@TaskService is null) BEGIN
                                ELSE BEGIN
                                    -- there are more tasks 
                                    PRINT '@TaskService is not null: BEGIN'

                                    PRINT 'BEGIN DIALOG with ' + @TaskService

                                    --another task
                                    BEGIN DIALOG @chEngine
                                        FROM SERVICE [JobService]
                                        TO SERVICE @TaskService
                                        ON CONTRACT [TaskContract]
                                        WITH RELATED_CONVERSATION=@dialog;

                                    SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                                    PRINT 'SEND ' +cast(@EngineMsg as varchar(max));

                                    SEND ON CONVERSATION @chEngine
                                    MESSAGE TYPE SubmitTask (@EngineMsg)                                    

                                    PRINT 'SAVING State: ' +str(@step)

                                    -- save state
                                    Update JobState
                                        SET step = @NextStep
                                    FROM JobState
                                    WHERE cgid=@cgid

                                    PRINT '@TaskService is not null: END'

                                END -- ELSE (@TaskService is NOT NULL)

                                PRINT 'Processing MessageType TaskResponse...Complete'

                            END -- IF (@message_type_name = 'TaskCompleted')

                        END -- ELSE IF (@message_type_name <> 'JobRequest') 
                    END -- ELSE  (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
            END -- WHILE (1=1) BEGIN

            PRINT 'COMMIT TRANSACTION'
            COMMIT TRANSACTION

        END TRY
        BEGIN CATCH

            --rollback transaction

            DECLARE @ErrNum int
            DECLARE @ErrMsg varchar(max)


                SELECT
                    ERROR_NUMBER() AS ErrorNumber
                    ,ERROR_SEVERITY() AS ErrorSeverity
                    ,ERROR_STATE() AS ErrorState
                    ,ERROR_PROCEDURE() AS ErrorProcedure
                    ,ERROR_LINE() AS ErrorLine
                    ,ERROR_MESSAGE() AS ErrorMessage;

            PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'


            if (error_number()=1205) BEGIN
                -- a deadlock occurred. We can try it again.
                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                ROLLBACK TRANSACTION
                --CONTINUE
            END --if (error_number()=1205)
            ELSE BEGIN
                if (error_number()=9617) BEGIN
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                    ROLLBACK TRANSACTION
                END
                ELSE BEGIN -- (error_number()<>9617) 
                    -- another error occurred.  The message cant be procesed sucessfully
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
                    ROLLBACK TRANSACTION MessageReceivedSavePoint

                END --ELSE (error_number()<>9617)       
            END -- if (error_number()<>1205)
        END CATCH

    END -- while loop

PRINT 'pr_ProcessJob: Complete' 

END -- CREATE PROCEDURE [dbo].[ProcessJobProc]

【问题讨论】:

  • 发送这些Task消息的逻辑是什么?
  • HI 你解决了这个问题我遇到了类似的问题 uswer 提交到队列 内部激活器进程然后使用 postnotification 发送到作业队列到从作业队列中提取的外部激活器,但一切都运行多次?

标签: service external activation service-broker


【解决方案1】:

如果您的应用程序的后续实例能够看到该消息,这仅意味着一件事:前一个实例必须回滚接收。乍一看,您提供的代码看起来不错,所以我会去寻找您正在使用的对象模型中的错误。如果其中一个方法抛出异常,应用程序将被终止,收到消息的事务自动回滚。

如果您希望一次只运行一个应用实例,请将 Max 设置保持为 1,否则默认情况下它将同时运行更多实例以跟上负载。

【讨论】:

  • 首先,感谢您的帮助。我还认为这是应用程序级别的某种异常/回滚,所以我开始记录到一个文件(我在我的问题中添加了一个日志示例,因为它不适合这里。)如您所见,它循环通过app 两次,包括 commit 语句。顺便说一句,它只有在外部激活时才会这样做。如果我在 VS 或命令行中运行该应用程序,它只会运行一次。我很困惑。再次感谢。
  • 您确定您实际上没有发送两条消息吗?发送这些任务消息的逻辑(代码)是什么?
  • 我有一个启用了内部激活的作业队列。存储过程根据作业和当前状态(已保存)获取下一个任务。任务根据类型进行处理并发送到适当的队列。 SP 太大了,无法在此处发布,但我会看看是否可以将其编辑到我的原始帖子中。
  • 最后一件事是从外部激活器的角度检查情况如何。如果您在 EA 的配置文件中启用以下跟踪选项,您将在跟踪文件中获得其操作的详细描述。 All LevelsAll ModulesAll Entities
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-05-15
  • 1970-01-01
  • 2012-11-16
  • 2015-12-10
  • 1970-01-01
相关资源
最近更新 更多