【问题标题】:MQTT send message from main threadMQTT 从主线程发送消息
【发布时间】:2019-11-20 10:30:18
【问题描述】:

我在 MqttHelper 类中实现了简单的 MQTT 订阅者,它可以正常工作并接收订阅。但是当我需要从主程序向服务器发送消息时我应该如何处理。我有方法 publish 可以从 IMqttActionListener 正常工作,但是如何在按钮按下事件时从主程序发送文本?

package com.kkk.mqtt.helpers;

import android.content.Context;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.io.UnsupportedEncodingException;

public class MqttHelper {
    public MqttAndroidClient mqttAndroidClient;

    final String serverUri = "tcp://tailor.cloudmqtt.com:16424";

    final String clientId = "ExampleAndroidClient";
    public final String subscriptionTopic = "sensor";

    final String username = "xxx";
    final String password = "yyy";



    public MqttHelper(Context context){
        mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);

        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean b, String s) {
                Log.w("mqtt", s);
            }

            @Override
            public void connectionLost(Throwable throwable) {

            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                Log.w("Mqtt", mqttMessage.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        connect();
    }

    public void setCallback(MqttCallbackExtended callback) {
        mqttAndroidClient.setCallback(callback);
    }


    public void publish(String topic, String info)
    {


        byte[] encodedInfo = new byte[0];
        try {
            encodedInfo = info.getBytes("UTF-8");
            MqttMessage message = new MqttMessage(encodedInfo);
            mqttAndroidClient.publish(topic, message);
            Log.e ("Mqtt", "publish done");
        } catch (UnsupportedEncodingException | MqttException e) {
            e.printStackTrace();
            Log.e ("Mqtt", e.getMessage());
        }catch (Exception e) {
            Log.e ("Mqtt", "general exception "+e.getMessage());
        }

    }

    private void connect(){
        Log.w("Mqtt", "connect start " );
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());

        try {

            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener()
            {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.w("Mqtt", "onSuccess " );
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                    publish(MqttHelper.this.subscriptionTopic,"information");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.w("Mqtt", "Failed to connect to: " + serverUri + exception.toString());
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }


    private void subscribeToTopic() {
        try {
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.w("Mqtt","Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.w("Mqtt", "Subscribed fail!");
                }
            });

        } catch (MqttException ex) {
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }
}

启动 MQTT 订阅者的代码:

private void startMqtt() {
    mqttHelper = new MqttHelper(getApplicationContext());
    mqttHelper.setCallback(new MqttCallbackExtended()
    {
        @Override
        public void connectComplete(boolean b, String s) {
            Log.w("Mqtt", "Connect complete"+ s );
        }

        @Override
        public void connectionLost(Throwable throwable) {
            Log.w("Mqtt", "Connection lost" );
        }

        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            Log.w("Mqtt", mqttMessage.toString());
            dataReceived.setText(mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            Log.w("Mqtt", "Delivery complete" );

        }
    });
    Log.w("Mqtt", "will publish");


}

【问题讨论】:

    标签: java android mqtt iot paho


    【解决方案1】:

    Paho 不在 UI 线程上运行,但它可能会异步回调 UI 线程。

    只需让ActivityFragment 实现MqttCallbackExtended 接口:

    public class SomeActivity extends AppCompatActivity implements MqttCallbackExtended { 
    
        ...
    
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            Log.d("Mqtt", "Connect complete > " + serverURI);
        }
    
        @Override
        public void connectionLost(Throwable cause) {
            Log.d("Mqtt", "Connection lost");
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.d("Mqtt", "Received > " + topic + " > " + message.toString());
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            Log.d("Mqtt", "Delivery complete");
        }
    }
    

    然后用SomeActivity 构造MqttHelper,因为它是MqttCallbackExtended listener

    public MqttHelper(Context context, MqttCallbackExtended listener) {
        this.mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
        this.mqttAndroidClient.setCallback(listener);
    }
    

    例如:

    this.mqttHelper = new MqttHelper(this);
    this.mqttHelper.setCallback(this);
    
    this.mqttHelper.publish("Java", "SomeActivity will handle the callbacks.");
    

    Application 中处理这些是有问题的,因为Application 没有UI,而Context 没有Theme。但是对于扩展ActivityFragmentDialogFragmentRecyclerView.Adapter 等的类,当想要与他们的 UI 交互时,实现回调interface 是有意义的。 p>


    供参考,MqttCallbackExtendedextendsMqttCallback

    【讨论】:

      【解决方案2】:

      另一种解决方案:

      1. 创建一个扩展android.app.ServiceMQTTService 类。
        Android 服务 类在主线程 中工作。所以如果你想使用另一个线程,你可以简单地使用MqttAsyncClient

      2. 您将使用回调方法在messageArrived()另一个线程(不是主线程)中自动接收来自代理的消息。

      3. 通过 EventBus 库将数据/命令从应用程序 UI(Activity-Fragment,...)简单地传递到 MQTTService

      4. 再次使用 messageArrived() 回调方法中的 EventBus 将接收到的数据从代理传递到应用程序的所需部分。
        请注意,在此步骤中,如果您的目标是应用程序 UI,则必须在目标中使用 @Subscribe(threadMode = ThreadMode.MAIN) 才能在主线程中获取数据。

      示例代码:

      public class MQTTService extends Service {
      
          private MqttAsyncClient mqttClient;
          private String serverURI;
      
           @Override
          public void onCreate() { 
              //do your initialization here
              serverURI = "tcp://yourBrokerUrlOrIP:yourBrokerPort";
              EventBus.getDefault().register(this);
          }
      
          @Override
          public int onStartCommand(Intent intent, int flags, int startId) {
            init();
            connect();
          }
      
      
      
          private void init() {
               mqttClient = new MqttAsyncClient(serverURI, yourClientId, new MemoryPersistence())
               mqttClient.setCallback(new MqttCallback() {
                  @Override
                  public void connectionLost(Throwable cause) {
                  }
      
                  @Override
                  public void messageArrived(String topic, MqttMessage message) throws Exception {
                     //now you will receive messages from the broker in another thread automatically (not UI Thread).
                     //You can do your logic here. for example pass the received data to the different sections of the application:
                     EventBus.getDefault().post(new YourPOJO(topic, message, ...));
                  }
      
                  @Override
                  public void deliveryComplete(IMqttDeliveryToken token) {
                  }
              });
          }
      
          private MqttConnectOptions getOptions(){
              MqttConnectOptions options = new MqttConnectOptions();
              options.setKeepAliveInterval(...);
              options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
              options.setAutomaticReconnect(true);
              options.setCleanSession(false);
              options.setUserName(...);
              options.setPassword(...);
              //options.setWill(...);
              //your other configurations
              return options;
          }
      
          private void connect() {
              try {
                  IMqttToken token = mqttClient.connect(getOptions(), null, new IMqttActionListener() {
                      @Override
                      public void onSuccess(IMqttToken asyncActionToken) {
                          //do works after successful connection
                      }
      
                      @Override
                      public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                      }
                  });
      
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }
      
          @Override
          public void onDestroy() {
              EventBus.getDefault().unregister(this);
              mqttClient.close();
              mqttClient.disconnect();
          }
      
          //this method receives your command from the different application sections
          //you can simply create different "MqttCommandPOJO" classes for different purposes
          @Subscribe
          public void receiveFromApp1(MqttCommandPOJO1 pojo1) {
              //do your logic(1). For example:
              //publish or subscribe something to the broker (QOS=1 is a good choice).
          }
      
          @Subscribe
          public void receiveFromApp2(MqttCommandPOJO2 pojo2) {
              //do your logic(2). For example:
              //publish or subscribe something to the broker (QOS=1 is a good choice).
          }
      
      }
      

      现在您可以在应用程序的每个部分中简单地接收从MQTTService 传递的数据。

      例如:

      public class MainActivity extends AppCompatActivity {
          ...
      
          @Subscribe(threadMode = ThreadMode.MAIN)
          public void receiveFromMQTTService(YourPojo pojo){
             //Do your logic. For example update the UI.
          }
      }
      

      其他链接:
      General instructions


      最好的祝福

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2015-01-09
        • 1970-01-01
        • 1970-01-01
        • 2018-05-05
        • 2015-12-07
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多