paho mqtt
遇到诸如“此应用程序只需要向另一个服务器发送值”之类的问题时,总会有一种诱惑,将其简化为打开套接字并发送值之类的事情。 但是,这个简单的主张很快就在生产中瓦解了。 除了必须编写系统的服务器端之外,开发人员还必须应对以下事实:网络不是100%可靠的,而围绕我们的无线和移动网络在设计上并不可靠,因此很可能需要访问控制和加密。
编写代码来解决这个问题,最终会遇到更加复杂,难以测试的例程,这些例程很难证明它们将遇到的极端情况。 更糟糕的是,复杂性的增加并没有增加功能或互操作性。 面对所有这些,从一个可互操作的,功能强大的协议开始就更好了,该协议已经解决了所有这些问题? 这就是MQTT(MQ遥测传输)的出现。
为什么选择MQTT?
MQTT来自M2M (机器对机器)和物联网的世界。 在那里,设备的大小可以与通过无线系统连接的传感器和控制器一样小。 这种环境促使在代码占用量和系统负载方面,任何协议的实现都必须轻量级,同时还要解决该可变可靠性连接问题。
MQTT最初是由IBM的Andy Stanford-Clark和Arcom的Arlen Nipper(后来由Eurotech收购)创建的,作为企业消息传递系统的补充,以便可以安全,轻松地将企业外部的大量数据带入企业内部。 MQTT是一种发布/订阅消息传递系统,它使客户端可以发布消息而无需担心自己的最终目的地。 消息被发送到MQTT代理,在其中可以保留它们。 消息的有效负载只是一个字节序列,最大为256MB,对这些有效负载的格式没有任何要求,并且MQTT协议通常向大多数消息添加两个字节的固定标头。
其他客户端可以订阅这些消息,并在新消息到达时由代理进行更新。 为了考虑使用MQTT的各种可能情况,它允许客户和经纪人根据消息的内容设置从“开火即忘”到“确认交付”的“服务质量”。 MQTT还具有非常轻便的API,具有所有五种协议方法,使其易于学习和调用,但是还支持SSL加密的连接以及客户端到代理的用户名/密码身份验证。
自首次亮相以来,MQTT已在生产场景中证明了自己。 与独立的MQTT代理一样,它也已集成到其他消息队列代理中,例如ActiveMQ和RabbitMQ,提供了到企业网络的桥梁。 MQTT 3.1规范的最新版本被用作OASIS消息遥测标准的基础,为了保持兼容性,该基础与MQTT规范相差甚远。
为什么选择帕霍?
MQTT是一种协议,协议需要客户端实现。 Eclipse Paho项目是Eclipse Foundation M2M任务的一部分,该任务旨在提供M2M库和工具的高质量实现。 在Paho旗帜下,正在策划和开发MQTT的开源客户端库。 在不同的开发阶段,已经有带有Lua,Python,C ++和JavaScript的MQTT C和Java库。 在本文中,我们将展示如何使用Paho Java MQTT库进行发布和订阅。
深入研究MQTT
要开始考虑代码中的MQTT,这是MQTT API的最简单用法:
client = new MqttClient("tcp://localhost:1883", "pahomqttpublish1"); client.connect(); MqttMessage message = new MqttMessage(); message.setPayload("A single message".getBytes()); client.publish("pahodemo/test", message); client.disconnect();
在此代码段中,我们通过TCP到端口1883(MQTT的默认端口)的连接,创建了到本地主机上运行的MQTT代理的客户端连接。 客户需要具有一个标识符,该标识符对于连接到代理的所有客户都是唯一的–在这种情况下,我们为客户提供一个pahomqttpublish1的ID。 然后,我们告诉客户端进行连接。 现在我们可以创建一个MqttMessage并将其有效载荷设置为一个简单的字符串。 注意,我们将字符串转换为字节,因为setPayload仅采用字节数组。 我们依靠MqttMessage的默认设置来设置其他各种参数。 接下来,我们发布消息,并且在这里我们需要介绍主题。
为了避免每个客户端都收到由其他客户端发布的每条消息这一明显问题,MQTT消息将与主题一起发布 。 主题是一个结构化的字符串,它使用“ /”定义了命名空间中的位置,用于分隔该命名空间层次结构的级别。 主题可以是例如“ / pumpmonitor / pumps / 1 / level”或“ / stockmarket / prices / FOO” 。 开发人员可以根据自己正在处理的任务来设计主题的结构。 客户端发布到绝对主题没有任何歧义,但是他们可以使用通配符订阅主题以聚集消息。 “ +”表示隐式层次结构的一个级别,而“#”表示从该点开始的所有树。 给定前面的示例,可以为泵1的级别订阅“ pumpmonitor / pumps / 1 / level” ,为所有泵级别订阅“ pumpmonitor / pumps / + / level” ,甚至对于所有泵活动都订阅“ pumpmonitor / pumps /#” 。
在简短的代码段中,我们已将其发布到“ pahodemo / test” 。 最后,我们断开与代理的连接,并完成了MQTT会话。 但是我们可以将消息发布到哪里?
获得经纪人
MQTT中的代理处理接收已发布的消息并将其发送给已订阅的任何客户端。 在我们的简短示例中,我们连接到在本地系统上运行的代理。 尽管有很多代理,但是Mosquitto代理是最简单的配置和运行,仅用于MQTT工作。 它也是开源的,因此您可以下载它并在自己的系统上运行它,无论是Windows,Mac OS X,Linux还是许多其他平台。 Mosquitto经纪人代码也作为新项目的一部分被添加到Eclipse中。
Eclipse Foundation对Mosquitto并不陌生-它在m2m.eclipse.org上将Mosquitto的公共实例作为MQTT沙箱运行,因此,如果您无法下载并运行自己的Mosquitto服务器,则可以将示例中的连接URI更改为“ tcp: //m2m.eclipse.org:1883“。 请记住,这是一个共享的沙箱,因此发布到本文中使用的主题的人很可能会被阅读本文并运行示例的其他人覆盖。
Mosquitto的默认配置意味着将其设置为不使用用户名/密码身份验证,并接受端口1883上的所有连接。它还带有两个客户端mosquitto_pub和mosquitto_sub ,后者在调试应用程序时非常有用。 运行:
mosquitto_sub -t "#" -v
会将所有新消息转储到代理。 请记住该主题周围的引号,尤其是在Unix上使用“#”通配符时,无论是未引号还是未转义的引号,都表示注释的开始,并且会看到其余命令被丢弃。 如果让该命令运行,并在另一个窗口中运行‘mosquitto_pub -t“ mosquittodemo / test” -m“ Hi”’,那么您应该看到mosquitto_sub会话列出了该消息。 现在,我们可以在某个地方发布内容,因此让我们运行该代码。
在IDE中
为了使我们的代码片段运行,我们将使用Eclipse Maven支持来处理依赖关系。 创建一个新的Java项目,然后选择配置→转换为Maven项目。 首先,由于Paho MQTT代码尚未在Maven Central中(尚未),我们需要包括其存储库–打开pom.xml文件,并在</ version>之后添加
<repositories> <repository> <id>paho-mqtt-client</id> <name>Paho MQTT Client</name> <url>https://repo.eclipse.org/content/repositories/paho-releases/</url> </repository> </repositories>
然后,我们需要为Mqtt-client代码添加依赖项。 仍然在pom.xml文件中,但是这次,在</ build>之后,添加
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> <packaging>jar</packaging> <version>0.4.0</version> </dependency> </dependencies>
保存pom.xml并创建一个新的Java类PahoDemo 。 基本上,这是较早地包装代码段所需的Java代码,并且应如下所示:
package org.eclipse.pahodemo; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PahoDemo { MqttClient client; public PahoDemo() {} public static void main(String[] args) { new PahoDemo().doDemo(); } public void doDemo() { try { client = new MqttClient("tcp://localhost:1883", "pahomqttpublish1"); client.connect(); MqttMessage message = new MqttMessage(); message.setPayload("A single message".getBytes()); client.publish("pahodemo/test", message); client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } }
并将其作为Eclipse中的Java应用程序运行。 如果您仍在运行mosquitto和mosquitto_sub ,则应该看到:
pahodemo/test A single message
出现。 现在,我们已经运行了一个基本的Paho MQTT发布客户端,我们可以开始探索各种可用的选项。
讯息选项
MQTT中的每个消息都可以具有其服务质量并保留标志。 服务质量会建议代码是否以及如何确保消息到达。 有三个选项,0(最多一次),1(最少一次)和2(恰好一次)。 默认情况下,新的消息实例设置为“至少一次”,服务质量(QoS)为1,这意味着发件人将至少发送一次消息,并且如果没有确认,它将继续发送它会设置一个重复的标志,直到出现确认为止,此时客户端将从其持久的消息集中删除该消息。
QoS为0,“最多一次”是最快的模式,客户端不等待确认。 当然,这意味着如果断开连接或服务器出现故障,则消息可能会丢失。 规模的另一端是2的QoS,“精确一次”,它使用两对交换,首先传输消息,然后确保仅接收到一个副本并对其进行处理。 这确实使“精确一次”设置的速度较慢,但最可靠。
默认情况下, MqttMessage的保留标志设置为false。 这意味着代理将不会保留该消息,以便在发送消息之后到达的所有订阅者都不会看到该消息。 通过设置保留标志,消息将由代理保留,因此,当晚到者连接到代理或客户端创建新的订阅时,它们将获得所有相关的保留消息。
连接选项
连接到代理时,可以设置许多选项,这些选项封装在MqttConnectOptions类中。 这些包括用于保持与代理的连接的保持活动间隔,用于传递消息的重试间隔,连接超时时间,清除会话标志,连接的意愿,以及对于Java的代码, SocketFactory使用该间隔 。
如果我们修改客户端,使其显示为:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; ... MqttConnectOptions options; ... client = new MqttClient("tcp://localhost:1883", "pahomqttpublish2"); options = new MqttConnectOptions(); client.connect(options);
我们可以尝试使用连接选项。 对于此示例,有趣的选项是clean标志和will。 当发送的QoS大于0的消息时,需要采取步骤以确保客户端重新连接时,它不会重复消息并恢复与代理的先前会话。 但是,如果要确保在连接和断开连接时都丢弃所有状态信息,则可以将clean session标志设置为true。 经纪人如何识别您可能问的客户? 客户端ID就是答案,这也是您需要确保客户端ID不同的原因。
遗嘱选项允许客户为最坏的情况做准备。 尽管被称为遗嘱,但它更像是“给我的信,以防万一我发生可疑的事情”。 遗嘱包含一条消息,如果客户端消失而没有干净地关闭连接,则该消息将由代理发送。 像普通消息一样,有一个主题,有效负载,QoS设置和保留标志。 因此,如果我们要通过发送未保留但有保证的交付消息来记录失败的客户端,则可以将代码更改为:
options = new MqttConnectOptions(); options.setWill("pahodemo/clienterrors", "crashed".getBytes(),2,true); client.connect(options);
运行代码,您将不会发现任何更改。 如果要对此进行测试,请插入System.exit(1);。 在client.disconnect之前模拟异常终止。 我们现在很高兴发送消息,但是我们不知道何时发送消息,还没有订阅主题。
投放回调
监听Java API中的MQTT活动的核心是MqttCallback接口。 它允许API在消息到达,消息传递完成或连接断开时调用我们指定的代码。 如果将实现MqttCallback添加到PahoDemo类声明中,则Eclipse IDE将帮助我们添加所需的导入并提供实现所需方法的方法:
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; public void deliveryComplete(IMqttDeliveryToken token) {} public void messageArrived(String topic, MqttMessage message) throws Exception {} public void connectionLost(Throwable cause) {}
现在,我们需要做的就是通过添加client.setCallback(this);来告诉MqttClient我们已经做到了。 在使用它连接到代理之前。 有了这个,让我们看看何时调用这些方法。
当按照其服务质量设置完全传递完一条消息后,将调用deliveryComplete()回调。 这意味着,对于QoS为0,当消息已写入网络时,对于QoS为1,当已确认消息发布时,对于QoS为2,当不仅已确认但确认消息发布时,是已传递邮件的唯一副本。
由于存在回调,开发人员可能想知道publish方法是异步的还是阻塞的。 答案是它可以由MqttClient设置timeToWait控制 。 这设置了客户端将任何操作返回控制权到应用程序其余部分之前等待的时间(以毫秒为单位)。 默认情况下,此值设置为-1 ,表示永不超时,直到完成才阻塞。 如果代码称为client.setTimeToWait(100); 那么如果调用花费的时间少于100毫秒,在100毫秒后或者断开连接或关闭,则任何调用都会在应用程序完成后立即将控制权返回给应用程序。 调用client.getPendingDeliveryTokens()将返回一个令牌数组,其中包含有关当前“正在进行中”的消息的信息。 无论采用哪种方式设置timeToWait ,在进行传递时仍将调用deliveryComplete()方法。
订阅内容
messageArrived()回调方法是任何订阅的主题收到消息时都会调用的方法。 MqttClient的subscription ()和unsubscribe()方法设置我们感兴趣的主题消息。最简单的版本是client.subscribe(“ topicfilter”) ,它将订阅的服务质量默认设置为1。 我们当然可以设置QoS- client.subscribe(“ topicfilter”,qos) -或订阅一个过滤器数组和一个可选的QoS值数组以与它们一起使用。 顺便说一句,QoS设置是最大值,因此,如果您已预订QoS为1,则以QoS为0或1发布的消息将以该QoS传递,而QoS为2发布的消息将以QoS传递。 QoS为1。
订阅后,消息将开始到达messageArrived()回调方法,在该方法中,主题和MqttMessage作为参数传递。 当在messageArrived()中时 ,新到达的消息将排队,并且直到已完全完成回调之后,才会发送对正在处理的消息的确认。 如果您要对消息进行复杂的处理,请使用其他某种机制将数据复制并排入队列,以避免阻塞消息系统。
订阅受建立连接时使用的干净会话标志的影响。 如果会话的清除设置设置为false,则系统应在会话之间保留订阅,并且无需重新订阅。 将clean标志设置为true时,客户端在重新连接时必须重新订阅。 当客户确实订阅了一个主题时,即使订阅的主题查询部分或完全与先前的订阅相交,客户端也会收到与他们所请求的主题匹配的所有保留值。
需要注意的重要一点是,为简单起见,我们仅介绍了API的同步版本,其中,对MQTT API块的每次调用以及按其自己的时间表进行的唯一操作是来自订阅的入站消息。 这个API,MqttClient的版本,是围绕API,MqttAsyncClient,所有调用不阻止更强大的异步版本薄的包装,给他们的结果无论是应用的监控是通过电话或通过返回的令牌完成的操作,该操作将回调到实现IMqttActionListener接口的类。 在进一步开发基于MQTT的应用程序时,值得考虑使用同步API还是异步API更适合您的情况。
通过MQTT提供统计信息
总结一下,我们将展示向Java应用程序添加功能所需的MQTT API很少。 在这种情况下,我们将使用Jetty文档中的示例Jetty FileServer.java示例。 如果我们想计算页面处理程序处理请求的次数,我们只需扩展ResourceHandler类,添加计数代码,并使服务器使用该增强的处理程序而不是默认的处理程序。 在这种情况下,我们还希望添加一些计数功能并启动和停止MQTT客户端:
class CountingResourceHandler extends ResourceHandler { int req_count=0; MqttClient client; public CountingResourceHandler() { super(); } @Override public void doStart() throws Exception { super.doStart(); // Create the MqttClient connection to the broker client=new MqttClient("tcp://localhost:1883", MqttClient.generateClientId()); client.connect(); } public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { super.handle(target, baseRequest, request, response); // Increment the count req_count++; try { // Publish to the broker with a QoS of 0 but retained client.publish("countingjetty/handlerequest", Integer.toString(req_count).getBytes(),0,true ); } catch (MqttException e) { e.printStackTrace(); } } @Override public void doStop() throws Exception { super.doStop(); // Cleanly stop the Mqtt client connection client.disconnect(); } }
这不是可伸缩的示例,因为它已将MqttClient绑定到资源处理程序,但是如果将其合并到Jetty示例中,则每当servlet处理请求时,它将把该计数发布到(在这种情况下)代理在本地主机上。 这里使用MqttClient.generateClientId()生成clientid ,它将使用登录的用户名和一天中的时间来尝试确保不冲突的客户端ID。
请记住,虽然会话的恢复取决于连接之间的客户端ID是否相同,但除非我们记录并重复使用它,否则每次运行的客户端ID都会不同。 默认情况下, MqttClient打开一个“干净”会话。 不要在将干净会话设置为“ false”的情况下使用generateClientId() ,否则,每次客户端启动时,先前会话中的碎片都会留在代理中,因为它无法整理,因为没有匹配的客户端id可以整理反对。
还要注意,我们发布的QoS为0的统计信息是因为我们不担心要交付的统计信息,但是我们还将keep标志设置为true,以便经纪人可以记住任何客户的最新交付价值订阅统计信息。
结语
因此,MQTT和Paho项目为我们提供了一个灵活,轻量级的协议,其中包含Java和C,Lua和其他实现,可以轻松地将其调整为一系列用例,并且对我们如何在其间传递数据没有任何要求。 它是一个功能强大的工具,我们甚至还没有开始在它设计的环境中,将传感器连接到服务器的物联网中查看它-我们将在Paho的实用MQTT的下一部分中讨论它。
paho mqtt
今天的文章paho mqtt_Paho的实用MQTT分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/25009.html