【问题标题】:Active MQ Topic Consumer fails to consume some messagesActivemq Topic Consumer消费部分消息失败
【发布时间】:2014-12-14 07:01:01
【问题描述】:

我有一个生产者/消费者客户端,其主题是使用 Stomp 协议的 Active MQ。 我使用 Gozirra。

问题不在于对话另一端产生的所有消息 到达消费者客户端,如下所示。 一些消息成功到达,但其他消息未能进入。

我听说消息丢失是由于实现的消息传递的异步性质 基于消费者/生产者模型。

有人说使用收据机制可能会有所帮助。

你认为是什么问题?

================================================ ============================

代码如下

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.LoginException;
import com.lnisoft.ontocept.StringEscapeUtils;
import net.ser1.stomp.Client;
import net.ser1.stomp.Listener;
import android.os.AsyncTask;


// * Active MQ Stomp client using Gozirra

public class Communicator implements Listener 
{
private static Communicator instance;
private Client stomp_client = null;
private ConnectToOntoceptAsync connector = null;

private String emailAsUserIdentifier = "";
private String topicName = null;
private String messageToSend = "NO_MESSAGE";

private boolean isLoggedIn = false;
private StreamConverter converter = null;
private String errorMessage = "";
private boolean hasError = false;
public HashMap<String, MessageStack> perCommunicatorUserMessageStackTbl = new HashMap<String, MessageStack>();

public Communicator()
{
     converter = new StreamConverter();
}

public static Communicator getInstance()
{
     if ( instance == null )
     {
          instance = new Communicator();
     }

     return instance;
}

@Override 
public void message( Map headers, String body ) 
{ 
      String ascii_to_unicode = StringEscapeUtils.unescapeJava( body ); 

      if ( ((String)headers.get("sender")).contains("ontocept") )
      {
          storeMessageFromOntocept( ascii_to_unicode );
      }
      else
      {
          System.out.println("my message : "+ ascii_to_unicode);               
          System.out.println("\n"); 
      }
} 

public boolean getIsLoggedIn()
{
     return this.isLoggedIn;
}

public void initializeCommunicator( String _emailAsUserIdentifier ) throws LoginException, IOException
{       
    this.emailAsUserIdentifier = _emailAsUserIdentifier;

    topicName = "/topic/"+ this.emailAsUserIdentifier;

    connector = new ConnectToOntoceptAsync();
    connector.execute();
}

public boolean isIntialized() 
{
     if ( isLoggedIn == false )
     {
           return false;
     }
     else
     {
          return true;
     }
}

protected class ConnectToOntoceptAsync extends AsyncTask<String, Void, String> 
{       
    @Override
    protected String doInBackground( String... params )  
    {
        try 
        { 
             stomp_client = new Client( "***.***.**.***", 61613, emailAsUserIdentifier,  "1234" );  
             stomp_client.subscribe( topicName, Communicator.this  );
             isLoggedIn = true; 

        } 
        catch (LoginException e) 
        { 
            e.printStackTrace(); 
        } 
        catch (IOException e) 
        { 
            e.printStackTrace(); 
        } 

         return "Executed";
    }

    @Override
    protected void onPostExecute(String text) 
    {
          // TODO 
    }
}

public boolean hasError()
{
     return this.hasError;
}

public String getError()
{
     return this.errorMessage;
}

public void disconnect()
{
    stomp_client.unsubscribe( this.topicName );
    stomp_client.disconnect();
    this.isLoggedIn = false;
}

public void sendMessageToOntocept( String _messageText )
{
     this.messageToSend = _messageText;

     new SendAsyncMessageToOntocept().execute();
}

private class SendAsyncMessageToOntocept extends AsyncTask<String, Void, String>
{
    @Override
    protected String doInBackground( String... params )  
    { 
        String unicode_formatted_message = converter.convertToUnicodeText( messageToSend  ); 

        Map<String, String> header = new HashMap<String,String>();

        // * http://www.germane-software.com/software/Java/Gozirra/
        header.put( "type", "text/plain" );  
        header.put( "sender", "user" );

        stomp_client.send( topicName, unicode_formatted_message, header );

        try 
        { 
            Thread.sleep(2000); 
        } 
        catch (InterruptedException e) 
        { 
            e.printStackTrace(); 
        } 

        Thread.yield(); 

        return "Executed";
    }

    @Override
    protected void onPostExecute(String text) 
    { }
}

public boolean hasNewMessage( String _messageRecipientIdentifier )
{ 
      if ( _messageRecipientIdentifier.contains("quiz_activity") )
      {
            if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
            {
                   return false;
            }
            else
            {
                 return perCommunicatorUserMessageStackTbl.get("quiz_activity").existNewMessage();
            }
      }
      else if ( _messageRecipientIdentifier.contains("vms_activity") )   // VisualizeMemorySpace 액티비티 약자
      {
            if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
            {
                    return false;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("vms_activity").existNewMessage();
            }
      }
      else  // _messageRecipientIdentifier = Ontocept_Activity
      { 
            if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                   return false;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("ontocept_activity").existNewMessage();
            }
      }
}

public Message getNewMessage( String _messageRecipientIdentifier )
{
      if ( _messageRecipientIdentifier.contains("quiz_activity") )
      {
            if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
            {
                  return null;
            }
            else
            {
                 return perCommunicatorUserMessageStackTbl.get("quiz_activity").getNewMessage();
            }
      }
      else if ( _messageRecipientIdentifier.contains("vms_activity") ) 
      {
            if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
            {
                  return null;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("vms_activity").getNewMessage();
            }
      }
      else  // _messageRecipientIdentifier = Ontocept_Activity
      { 
            if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                   return null;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("ontocept_activity").getNewMessage();
            }
      }
}

public String convertToUnicodeText( String str )
{
     StringBuffer ostr = new StringBuffer();

     for ( int i=0; i<str.length(); i++) 
     {
            char ch = str.charAt(i);

            if ((ch >= 0x0020) && (ch <= 0x007e))   // Does the char need to be converted to unicode? 
            {
                    ostr.append(ch);                    // No.
            } 
            else                                    // Yes.
            {
                    ostr.append("\\u") ;                // standard unicode format.
                    String hex = Integer.toHexString(str.charAt(i) & 0xFFFF);   // Get hex value of the char. 
                    for(int j=0; j<4-hex.length(); j++) // Prepend zeros because unicode requires 4 digits
                        ostr.append("0");
                    ostr.append(hex.toLowerCase());     // standard unicode format.
                    //ostr.append(hex.toLowerCase(Locale.ENGLISH));
            }
        }

      return (new String(ostr));        //Return the stringbuffer cast as a string.
}

private void storeMessageFromOntocept( String messageObjectString ) 
{ 
      Message messageObject =   MessageObjectGenerator.getInstance().parseMessageStringIntoMessageObject( messageObjectString );

      if ( messageObject.getRecipient().contains("ontocept_activity") )
      {
            if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                  MessageStack ms = new MessageStack();

                  ms.addMessage( messageObject );

                  this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
            }
            else
            {
                  this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
            }
      }
      else if ( messageObject.getRecipient().contains("vms_activity") )   
      {
            if ( this.perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
            {
                  MessageStack ms = new MessageStack();

                  ms.addMessage( messageObject );

                  this.perCommunicatorUserMessageStackTbl.put( "vms_activity", ms );
            }
            else
            {
                  this.perCommunicatorUserMessageStackTbl.get("vms_activity").addMessage( messageObject );
            }
      }
      else 
      { 
            if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                  MessageStack ms = new MessageStack();

                  ms.addMessage( messageObject );

                  this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
            }
            else
            {
                  this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
            }
      }
}

public HashMap<String, MessageStack> getMessageStack()
{
     return this.perCommunicatorUserMessageStackTbl;
}
}

【问题讨论】:

    标签: java android activemq stomp


    【解决方案1】:

    这似乎是一个 Android 应用。所以要求应用程序始终处于活动状态,否则与服务器的通信会中断,导致消息丢失。

    您确定会从后台线程中调用 Listener 吗?要连接到服务器,您的 AsyncTask ConnectToOntoceptAsync 会运行并立即返回。在 Android 上,所有网络通信都必须发生在单独的线程上。在我看来,这看起来很可疑。

    我会在后台线程上进行所有通信,因此 Stomp 客户端是 AsyncTask 的私有字段。此外,我更喜欢使用阻塞接收方法,而不是使用 Listener 接口,它可以提供更详细的控制。

    【讨论】:

    • 哦,你说的很对。正如您所说,我在另一个上添加了 Receipt 流程,以确保其中发送的消息安全地到达客户端活动。由于客户端是移动设备,我怀疑订阅者客户端可能随时断开连接。所以我正在考虑 MQTT。还是谢谢你。
    【解决方案2】:

    我留下了我的代码的当前版本,它不会丢失来自另一个端点的消息。

    我已将客户端划分为订阅者,在会话期间仅实例化一次,

    另一个作为消息生产者,每次我需要发送消息时都会实例化

    并断开连接。我不知道这是否是正确的做法。

    但是直到现在,所有的消息(聊天消息)都被安全接收了。

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import javax.security.auth.login.LoginException;
    
    import com.lnisoft.ontocept.StringEscapeUtils;
    
    import net.ser1.stomp.Client;
    import net.ser1.stomp.Listener;
    import android.os.AsyncTask;
    
    // * Active MQ Stomp using Gozirra
    public class Communicator 
    {
    private static Communicator instance;
    private Client subscriber = null;
    private Client stomp_client = null;
    private ConnectToOntoceptAsync connector = null;
    private String emailAsUserIdentifier = "";
        private String topicName = null;
    
    
    private String messageToSend = "NO_MESSAGE";
    
    
    private boolean isLoggedIn = false;
    
    
    private StreamConverter converter = null;
    
    
    private String errorMessage = "";
    private boolean hasError = false;
    
    
    public HashMap<String, MessageStack> perCommunicatorUserMessageStackTbl = new HashMap<String, MessageStack>();
    
    
    public Communicator()
    {
         converter = new StreamConverter();
    }
    
    public static Communicator getInstance()
    {
         if ( instance == null )
         {
              instance = new Communicator();
         }
    
         return instance;
    }
    
    
    public boolean getIsLoggedIn()
    {
         return this.isLoggedIn;
    }
    
    public void initializeCommunicator( String _emailAsUserIdentifier ) throws LoginException, IOException
    {       
        this.emailAsUserIdentifier = _emailAsUserIdentifier;
    
    
        topicName = "/topic/"+ this.emailAsUserIdentifier;
    
        connector = new ConnectToOntoceptAsync();
        connector.execute();
    }
    
    public boolean isIntialized() 
    {
         if ( isLoggedIn == false )
         {
               return false;
         }
         else
         {
              return true;
         }
    }
    
    protected class ConnectToOntoceptAsync extends AsyncTask<String, Void, String> 
    {       
        @Override
        protected String doInBackground( String... params )  
        {
            try 
            { 
                 subscriber = new Client( "***.***.**.***", 61613, emailAsUserIdentifier,  "1234" ); 
    
                 subscriber.subscribe( topicName, new Listener() 
                 {
                        public void message( Map header, String body ) 
                        { 
                              String ascii_to_unicode = StringEscapeUtils.unescapeJava( body ); 
    
                              if ( ((String)header.get("sender")).contains("ontocept") )
                              {
                                  storeMessageFromOntocept( ascii_to_unicode );
                              }
                        }
                      } );
    
                      isLoggedIn = true;   
            } 
            catch (LoginException e) 
            { 
                e.printStackTrace(); 
            } 
            catch (IOException e) 
            { 
                e.printStackTrace(); 
            } 
    
             return "Executed";
        }
    
        @Override
        protected void onPostExecute(String text) 
        {
              // TODO 
        }
    }
    
    public boolean hasError()
    {
         return this.hasError;
    }
    
    public String getError()
    {
         return this.errorMessage;
    }
    
    public void disconnect()
    {
        subscriber.unsubscribe( this.topicName );
        subscriber.disconnect();
    
        this.isLoggedIn = false;
    }
    
    public void sendMessageToOntocept( String _messageText )
    {
         this.messageToSend = _messageText;
    
         new SendAsyncMessageToOntocept().execute();
    }
    
    private class SendAsyncMessageToOntocept extends AsyncTask<String, Void, String>
    {
        @Override
        protected String doInBackground( String... params )  
        { 
            synchronized ( this )
            {
                try 
                { 
                    if ( stomp_client != null )
                    {
                        if ( stomp_client.isConnected() == true )
                        {
                            stomp_client.disconnect();
                        }
                    }
    
                    stomp_client = new Client( "***.***.**.***", 61613, emailAsUserIdentifier,  "1234" );   
    
                    String unicode_formatted_message = converter.convertToUnicodeText( messageToSend  );  
    
                    Map<String, String> header = new HashMap<String,String>();
    
                    // * http://www.germane-software.com/software/Java/Gozirra/
                    header.put( "type", "text/plain" );  
                    header.put( "sender", "user" );
    
                    try 
                    { 
                        Thread.sleep(2000); 
                    } 
                    catch (InterruptedException e) 
                    { 
                        e.printStackTrace(); 
                    } 
    
                    stomp_client.send( topicName, unicode_formatted_message, header );
    
                    try 
                    { 
                        Thread.sleep(2000); 
                    } 
                    catch (InterruptedException e) 
                    { 
                        e.printStackTrace(); 
                    } 
    
                    stomp_client.disconnect();
    
                    Thread.yield(); 
                } 
                catch (LoginException e) 
                { 
                    e.printStackTrace(); 
                } 
                catch (IOException e) 
                { 
                    e.printStackTrace(); 
                } 
            }
    
    
    
    
            return "Executed";
        }
    
        @Override
        protected void onPostExecute(String text) 
        { }
    }
    
    
    public boolean hasNewMessage( String _messageRecipientIdentifier )
    { 
          if ( _messageRecipientIdentifier.contains("quiz_activity") )
          {
                if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
                {
                       return false;
                }
                else
                {
                     return perCommunicatorUserMessageStackTbl.get("quiz_activity").existNewMessage();
                }
          }
          else if ( _messageRecipientIdentifier.contains("vms_activity") )   
          {
                if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
                {
                        return false;
                }
                else
                {
                       return perCommunicatorUserMessageStackTbl.get("vms_activity").existNewMessage();
                }
          }
          else  // _messageRecipientIdentifier = Ontocept_Activity
          { 
                if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
                {
                       return false;
                }
                else
                {
                       return perCommunicatorUserMessageStackTbl.get("ontocept_activity").existNewMessage();
                }
          }
    }
    
    public Message getNewMessage( String _messageRecipientIdentifier )
    {
          if ( _messageRecipientIdentifier.contains("quiz_activity") )
          {
                if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
                {
                      return null;
                }
                else
                {
                     return perCommunicatorUserMessageStackTbl.get("quiz_activity").getNewMessage();
                }
          }
          else if ( _messageRecipientIdentifier.contains("vms_activity") )  
          {
                if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
                {
                      return null;
                }
                else
                {
                       return perCommunicatorUserMessageStackTbl.get("vms_activity").getNewMessage();
                }
          }
          else  // _messageRecipientIdentifier = Ontocept_Activity
          { 
                if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
                {
                       return null;
                }
                else
                {
                       return perCommunicatorUserMessageStackTbl.get("ontocept_activity").getNewMessage();
                }
          }
    }
    
    public String convertToUnicodeText( String str )
    {
         StringBuffer ostr = new StringBuffer();
    
         for ( int i=0; i<str.length(); i++) 
         {
                char ch = str.charAt(i);
    
                if ((ch >= 0x0020) && (ch <= 0x007e))   // Does the char need to be converted to unicode? 
                {
                        ostr.append(ch);                    // No.
                } 
                else                                    // Yes.
                {
                        ostr.append("\\u") ;                // standard unicode format.
                        String hex = Integer.toHexString(str.charAt(i) & 0xFFFF);   // Get hex value of the char. 
                        for(int j=0; j<4-hex.length(); j++) // Prepend zeros because unicode requires 4 digits
                            ostr.append("0");
                        ostr.append(hex.toLowerCase());     // standard unicode format.
                        //ostr.append(hex.toLowerCase(Locale.ENGLISH));
                }
            }
    
          return (new String(ostr));        //Return the stringbuffer cast as a string.
    }
    
    
    private void storeMessageFromOntocept( String messageObjectString ) 
    { 
          Message messageObject = MessageObjectGenerator.getInstance().parseMessageStringIntoMessageObject( messageObjectString );
    
          if ( messageObject.getRecipient().contains("quiz_activity") )
          {
                if ( this.perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
                {
                      MessageStack ms = new MessageStack();
    
                      ms.addMessage( messageObject );
    
                      this.perCommunicatorUserMessageStackTbl.put( "quiz_activity", ms );
                }
                else
                {
                      this.perCommunicatorUserMessageStackTbl.get("quiz_activity").addMessage( messageObject );
                }
          }
          else if ( messageObject.getRecipient().contains("vms_activity") )   
          {
                if ( this.perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
                {
                      MessageStack ms = new MessageStack();
    
                      ms.addMessage( messageObject );
    
                      this.perCommunicatorUserMessageStackTbl.put( "vms_activity", ms );
                }
                else
                {
                      this.perCommunicatorUserMessageStackTbl.get("vms_activity").addMessage( messageObject );
                }
          }
          else  
          {               
                if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
                {
                      MessageStack ms = new MessageStack();
    
                      ms.addMessage( messageObject );
    
                      this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
                }
                else
                {
                      this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
                }
          }
    }
    
    public HashMap<String, MessageStack> getMessageStack()
    {
         return this.perCommunicatorUserMessageStackTbl;
    }
    

    }

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-04
      • 2015-11-25
      • 1970-01-01
      • 2021-03-06
      • 2021-04-02
      • 2020-01-17
      • 2023-01-27
      相关资源
      最近更新 更多