【问题标题】:How to dequeue messages that were enqueued when app was offline (Oracle Advanced Queue)当应用程序离线时如何使入队的消息出列(Oracle 高级队列)
【发布时间】:2012-02-26 07:22:13
【问题描述】:

我有 2 个问题。以下是场景——

有2个不同的进程进程A和进程B。 Process A enqueue 是消息队列中的消息。 进程 B 从消息队列中取出消息。

1) 进程 B 关闭了一段时间,但进程 A 继续将消息排入队列。当进程 B 重新上线时,当进程 B 离线时,如何将进程 A 发布的消息队列中的消息出列?

2) 我使用的队列是多个消费者队列,因为需要有超过 1 个进程 B 才能使消息出队。设计背后的原因是,如果一个进程 B 死亡,另一个进程 B 仍然可以继续处理消息。同时,如果进程 B 的 1 个实例接收到一条消息,它应该通知其他进程 B 不处理该消息。

我找不到任何样本。任何帮助是极大的赞赏。

【问题讨论】:

    标签: database oracle queue message-queue advanced-queuing


    【解决方案1】:

    我刚刚完成了一个要求非常相似的项目。

    问题 1) 我创建了一个 Windows 服务计时器,它调用 WCF Restful 服务来定期运行。然后,WCF 服务会将排队的任何内容出列(每次调用最多 500 条消息)。任何排队的东西都应该按顺序自动处理,所以即使这个计时器在重新启动后停止,它也会从停止的地方继续。

    问题 2) 我正在将数据从 Oracle 复制到 CouchBase,因此当进程开始时我有一个用于检索的时间戳,以及一个用于 CouchBase 中已保存数据的时间戳,如果第一个比后者旧,则它不会保存。 (这是为了照顾比赛条件)。

    在 Oracle 中,我还有一个触发器,当某些东西入队时,它会将 id 和入队时间复制到第二个表。定期检查第二个表,如果一个项目已在队列表中出列,但第二个表尚未更新以反映 WCF 服务在特定时间范围内的这一点,它将重新排队数据,因为该过程中出现故障.

    如果有帮助,这里有一个使用 odp.net 的 wcf restful 服务示例。

    OracleAQQueue _queueObj;
    OracleConnection _connObj;
    _connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString();
    _connObj = new OracleConnection(_connString);
    _queueObj = new OracleAQQueue("QUEUENAME", _connObj);
    _connObj.Open();
    
      int i = 0;
      bool messageAvailable = true;
    
      while (messageAvailable && i < 500)
      {
        OracleTransaction _txn = _connObj.BeginTransaction();
        //Makes dequeue part of transaction
        _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
        _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME"
        try
        {
             //Wait  number of seconds for dequeue, default is forever
             _queueObj.DequeueOptions.Wait = 2;
             _queueObj.MessageType = OracleAQMessageType.Raw;
             _queueObj.DequeueOptions.ProviderSpecificType = true;
             OracleAQMessage _depMsq = _queueObj.Dequeue();
             var _binary = (OracleBinary)_depMsq.Payload;
             byte[] byteArray = _binary.Value;
             _txn.Commit();
         }
         catch (Exception ex)
         {
             //This catch will always fire when all messages have been dequeued
             messageAvailable = false;
             if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1)
                {
                 //Actual error present. 
                 log.Info("Problem occurred during dequeue process : " + ex.Message);
                }
         }
      }
    
        _queueObj.Dispose();
        _connObj.Close();
        _connObj.Dispose();
        _connObj = null;
    

    【讨论】:

      猜你喜欢
      • 2016-01-20
      • 2015-08-10
      • 1970-01-01
      • 1970-01-01
      • 2016-11-25
      • 1970-01-01
      • 1970-01-01
      • 2012-12-08
      相关资源
      最近更新 更多