mqtt支持多少个topic_w5500 mqtt

mqtt支持多少个topic_w5500 mqtt先讲一下paho的mqtt连接的java实现情况 1、paho的mqtt底层是采用三个线程进行异步的消息发送、处理和接收的【debug的时候可以看到三个线程】,然后比较坑的是,在处理消息的时候,如果有运行是异常抛出但是没有处理的话,整个mqtt客户端直接断开连接。 2、然后就是底层paho提供了两个

mqtt支持多少个topic_w5500

先讲一下paho的mqtt连接的java实现情况

1、paho的mqtt底层是采用三个线程进行异步的消息发送、处理和接收的【debug的时候可以看到三个线程】,然后比较坑的是,在处理消息的时候,如果有运行是异常抛出但是没有处理的话,整个mqtt客户端直接断开连接。

2、然后就是底层paho提供了两个客户端连接实现——MqttClient和MqttAsyncClient。前者是同步的,后者是异步的,主要是把连接建立等耗时操作进行异步处理,一般使用方式为

  IMqttToken conToken;
  conToken = asyncClient.client.connect(conToken);
     ... do some work...
  conToken.waitForCompletion();

注:其实MqttClient底层也是采用的异步形式,主要是为了同之前的api兼容

3、最后mqtt的对于消息的处理是采用回调的方式,同时,对于收发消息可以采用注册监听器的方式进行进度的监听,具体使用可以参看paho项目的GitHub上的示例,上面有三个比较全的示例

4、关于MqttClientPersistence底下的两个类MemoryPersistence和MqttDefaultFilePersistence,主要是为了消息传送过程中的一个临时缓存,如Qos为1,2的消息

重连的思路

针对mqtt协议的原本用途——低网络质量环境,重连是必须的。目前的话重连有几种思路

1、在回调函数里面设置当mqtt客户端连接丢失时重新连接

2、在连接参数里面设置重连方法org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(boolean),个人推荐使用第二种方式

下面附上自己的采用第一种方式设置的mqtt客户端以及回调类

import java.io.UnsupportedEncodingException;
import java.util.List;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient {
       String clientId;
	private MemoryPersistence persistence = new MemoryPersistence();

	// Private instance variables
	private MqttClient client;
	private String brokerUrl;
	private MqttConnectOptions conOpt;
	private boolean clean;
	private String password;
	private String userName;

	//需要重新订阅的主题
	private List<String> topicList;

	
	
	public MyMqttClient() {
		super();
	}



	public MyMqttClient(String brokerUrl, String clientId, boolean cleanSession, String userName, String password)
			throws MqttException {
		super();
		this.brokerUrl = brokerUrl;
		this.clientId = clientId;
		this.clean = cleanSession;
		this.password = password;
		this.userName = userName;
		// 建立mqtt连接属性
		this.conOpt = new MqttConnectOptions();
		this.conOpt.setConnectionTimeout(60);
//		this.conOpt.setKeepAliveInterval(60);
		this.conOpt.setCleanSession(true);
		// 初始化客户端
		this.client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
		this.client.setCallback(new MyMqttCallback(this));
	}

	
	
	public List<String> getTopicList() {
		return topicList;
	}


	public void setTopicList(List<String> topicList) {
		this.topicList = topicList;
	}



	public void connect() {
		try {
			if (!this.client.isConnected()) {
				this.client.connect(this.conOpt);
			}
		} catch (MqttException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void subscribe(String topicName, int qos) {
		try {
			this.client.subscribe(topicName, qos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	public void publish(String topicName, String message, int qos) {
		try {
			MqttMessage mqttMessage = new MqttMessage();
			mqttMessage.setQos(qos);
			mqttMessage.setPayload(message.getBytes("utf-8"));
			this.client.publish(topicName, mqttMessage);
		} catch (MqttException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void close() {
		try {
			this.client.disconnect();
			this.client.close();
		} catch (MqttException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void reConnect() throws MqttSecurityException, MqttException {
		if (null != this.client) {
			if(!this.client.isConnected()) {
				client.connect(this.conOpt);
			}else {
				this.client.disconnect();
				this.client.connect(this.conOpt);
			}
		}
	}

}

 

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.util.Config;
import com.example.util.HikariCPUtil;

public class MyMqttCallback implements MqttCallback {
	
	private static final Logger logger = LoggerFactory.getLogger(MyMqttCallback.class);
	private static final ExecutorService pool = Executors.newFixedThreadPool(5);
	/**
	 * 私有化的mqtt客户端,用以重连
	 */
	private MyMqttClient myClient;

	public MyMqttCallback(MyMqttClient myClient) {
		super();
		this.myClient = myClient;
	}

	/**
	 * 设置重连机制
	 */
	@Override
	public void connectionLost(Throwable cause) {
		logger.error("连接丢失,原因{}",cause);
		// 连接丢失后,一般在这里面进行重连
		while (true) {
			try {
				Thread.sleep(30000);
				myClient.reConnect();
				List<String> topicList = this.myClient.getTopicList();
				for (String topic : topicList) {
					this.myClient.subscribe(topic, Config.QOS);
				}
				logger.info("mqtt重新连接,重新订阅!");
				break;
			} catch (Exception e) {
				e.printStackTrace();
				continue;
			}
		}
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {

                //消息处理
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		// TODO Auto-generated method stub
	}
	
}

 

关于重连之后的主题重新

主题重新订阅这个目前主要有两种实现方式,具体看需求

1、设置连接属性的MqttConnectOptions.setCleanSession(false),然后设置mqtt客户端的主题固定,重连上之后之前的主题保留,这个和mqtt的broker关系比较大

2、采用MqttCallbackExtended这个回调类,在org.eclipse.paho.client.mqttv3.MqttCallbackExtended.connectComplete(boolean, String)这个方法里面实现主题的重新订阅,这个一般结合org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(boolean)使用

3、像我上面的例子一样,在包装类里面缓存之前的topic,在短信重连成功的代码里面进行重新订阅即可

最后

代码都是一步步晚上,不要想着拿着我的代码就去用,能用,不保证会不会出什么bug的

今天的文章mqtt支持多少个topic_w5500 mqtt分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/49072.html

(0)
编程小号编程小号
上一篇 2023-09-03
下一篇 2023-09-03

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注