jms消息队列协议以及activemq的使用

jms消息队列协议以及activemq的使用jms协议一、jms是什么?jms是javamessageservice的缩写,翻译过来就是“java消息服务”,这样说依然有点抽象,有点不明觉厉,我也一直在想该怎么解释jms

jms协议

一、jms是什么?

        jms是java  message  service的缩写,翻译过来就是“java消息服务”,这样说依然有点抽象,有点不明觉厉,我也一直在想该怎么解释jms。还是拿我们最熟悉的jdbc来说,jdbc是什么?jdbc是Java DataBase Connectivity,java数据库连接,在java的jdk里面,有一个包:java.sql.*,这个包下面定义了一系列的接口,用来定义了向数据库插入记录的时候使用的是什么方法,从数据库获取记录的时候又是什么方法。。。。总的来说,在java.sql.*包下面就是定义了一套规范,这套规范约束(定义)了如何操作数据库,但是仅仅是规范而已,因为都是借口,具体怎么实现这些接口,每种数据库(mysql,db2,oracle….)有自己的方法,比如mysql就实现了,就是我们经常用到的mysql-connector-java-5.1.5-bin.jar。jms也可以这样来理解,在jdk的javax.jms.*包下面定义了一系列的接口,这些接口也是一套规范,这套jms规范定义了如何使用java的消息服务,这些接口的实现是由消息中间件厂商来实现的,比如activemq,oracle aq,ibm websphere mq等。

二、jms是用来干嘛的?

        比如北京有个系统a,上海有个系统b,两个系统需要交互,或者广州还有个系统c,也要和a交互,这时就可以考虑使用jms服务了,几个系统的开发者约定好了使用一种消息中间件,比如开源的activemq吧,a向activemq发送b和c需要的数据,b和c直接去activemq里面去取就可以了,在这里消息中间件就相当于一个桥梁一样连接了各个系统。

三、jms消息传送的分类

         其实,在jms出现之前,早就已经存在上面所述的多个系统间交互数据的问题,大家普遍采取的方案是“点对点(point2point)”或者“订阅/发布(Subscriber/publish)”,java结合这种现状,把这两种情况抽象成了jms接口,任何的消息中间件可以选择实现其中的任一种模式,当然也可以都实现,这两者之间没有关系,只是消息的发布与接收的方式不同而已。

点对点:简称p2p,不过不是理财的p2p哦,它的特点是:消息发布后只能由一个消费者来接收,你可以有多个消费者,但是一条消息被任一个消费者消费之后,其他消费者将无从获取,因此成为点对点。

订阅/发布:一个消息可以传递给多个订阅者,比如上面的a发送的数据可以被b和c订阅(消费)。

java操作activemq之点对点方式

下面代码需要使用到activemq的jar包activemq-all-5.8.0.jar,也在activemq的安装包里面。启动成功之后,访问http://localhost:8161/admin/,输入activemq默认的用户名和密码admin/admin,界面如下:

image.png

        是用java操作activemq之点对点方式就是Queue的方式,即队列,我们知道队列的特性是先进先出,在这里jms把点对点的消息方式命名为Queue不无道理,在jms里面Queue和发布/订阅模式的Topic两个接口都是继承自javax.jms.Destination接口。

先看java程序如何使用jms编程规范创建一个点对点消息消费的消息提供者(消息生产者MessageProducer):

package cn.jms.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  * 点对点的方式,先创建消息  * activemq:http://localhost:8161/admin/ 用户名/密码:admin/admin  * @author zhao  *  */ public class Producer { public static void main(String[] args) { ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection =null; try {  connection = factory.createConnection(); connection.start(); //true表示加入事务控制 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("firstQueue"); MessageProducer producer=session.createProducer(queue);//此处queue是Destination //可以创建字符串类型的、对象类型的等等 for (int i = 0; i < 10; i++) { System.out.println("生产者发送第"+i+"个消息"); TextMessage textMessage = session.createTextMessage("机密数据"+i); producer.send(textMessage); } session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ if (connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

运行生产者之后,在activemq的控制台查看Queues的变化:

image.png

有了10个消息了,没错,接下来编写一个消息的消费者(MessageConsumer)来消费这10个消息:

package cn.jms.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  * 点对点方式的消费者  *   * @author zhao  *  */ public class Consumer1 { public static void main(String[] args) { ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection =null; try {  connection = factory.createConnection(); connection.start(); //这里不用加事务了,是与生产着不同的地方 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("firstQueue"); //此处queue是Destination,与生产着不同的是此处创建的时消费者 MessageConsumer consumer=session.createConsumer(queue); int count=1; while(true){ TextMessage message = (TextMessage)consumer.receive();//receive()每100s接受一次 if (message!=null) { System.out.println("接收到了消息:"+message.getText()+"第"+count+"次"); count++; }else { break; } } } catch (JMSException e) { e.printStackTrace(); }finally{ if (connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

运行结果如下:

image.png

查看activemq的消息控制台,看看有什么变化呢?

image.png

可以看到,pending(等待被消费的消息)的数量由原来的10变为了0,consumers的数量由0–》1,enqueued(被消费的消息)的数量由0–>10,dequeued(从queue中移除的消息)从0变为10,说明消息被正确消费了。

不知道大家有没有注意到,上面我是先运行消息的生成者MessageProducer,再运行的消费者,如果先运行消费者呢?

image.png

因为这时候并没有消息可以被消费,消费者就一直等待在这里,因为consumer.receive()方法是同步的,如果接受不到消息,程序将会一直处于阻塞状态。之后,我们可以再运行生产者,消费者依然可以接收到消息:

image.png

同样的activemq的控制台也发生了变化:

image.png

依此,我们总结一下点对点消息消费模型的特点:

①、一个生产者发送的消息只能被一个消费者消费(你可以再复制一个消息消费者MessageConsumer试试哦)

②、当消息发送者发送消息的时候,无论接收者程序在不在运行,MessageConsumer都能获取到消息

③、消费者与生产者并不依赖,运行没有先后的顺序,不管谁先运行都能接受到消息

java操作activemq之监听器方式实现点对点模型的消费者

 上篇文章给出的消费者是通过while循环的方式来实现的,这种方式并不推荐,jms的官方写法是通过实现MessageListener这个接口来创建一个监听器来接收消息,生产者的代码是不需要改变的,我们把上篇文章的消费者代码修改一下,修改为实现MessageListener这个接口的方式来接收消息,最终的java代码如下:

package cn.jms.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer2 { public static void main(String[] args) { ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); try { Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("firstQueue"); MessageConsumer consumer=session.createConsumer(queue); JiMiMessageLisener jiMiMessageLisener = new JiMiMessageLisener(); consumer.setMessageListener(jiMiMessageLisener);//注册消息监听器 } catch (JMSException e) { e.printStackTrace(); } //cosumer和session和connection也要关闭,如果可以关闭的话 } } class JiMiMessageLisener implements MessageListener{ @Override public void onMessage(Message arg0) { TextMessage message = (TextMessage)arg0; try { System.out.println("监听到消息:"+message.getText()); } catch (JMSException e) { e.printStackTrace(); } } }

运行生产者发送消息之后,再运行消费者Consumer2(点对点消息模型queue先运行生产者还是消费者并没有影响),可以看到接收到了消息:

image.png

activemq的控制台也发生了变化:

image.png

这里再强调一下点对点消息模型:

*点对点方式的消费者:可以通过while循环实现,也可以通过监听器方式来实现。

 * 如果一个生产者产生消息了,一个消费者在接收或者监听,消息一产生就被接收到了,第二个消费者根本无从接收,接收不到任何消息,也不报错,所以是点对点。

java操作activemq之发布订阅模式

发布/订阅模式和点对点模式有点不同,就是前者需要先运行订阅者向activemq注册,先订阅了某个topic(主题),之后再有生产者向这个topic发送消息才行。

        发布订阅模式的生产者代码:

package cn.jms.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  消息发布者,要先运行订阅,再运行发布者  * activemq:http://localhost:8161/admin/ 用户名/密码:admin/admin  * @author zhao  *队列和主题这两种消息的消费者都在swt工程里  */ public class ProducerTopic { public static void main(String[] args) { ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection =null; Session session = null; MessageProducer producer=null; try {  connection = factory.createConnection(); connection.start(); //true表示加入事务控制  session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic createTopic = session.createTopic("firstTopic");//这里不再是createQueue  producer=session.createProducer(createTopic); // producer.setDeliveryMode(DeliveryMode.PERSISTENT);  System.out.println("NON_PERSISTENT:"+DeliveryMode.NON_PERSISTENT); System.out.println("现在默认是:"+producer.getDeliveryMode()); // producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //可以创建字符串类型的、对象类型的等等 for (int i = 0; i < 10; i++) { System.out.println("生产者发布第"+i+"个消息呀"); TextMessage textMessage = session.createTextMessage("机密数据呀"+i); producer.send(textMessage); } session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ //producer和session也要关闭 if (producer!=null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session!=null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

发布订阅模式的消费者1的代码:

package cn.jms.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  *确保了1.producer.setDeliveryMode(DeliveryMode.PERSISTENT);这样就会把发布的消息以kahadb的方式存储起来  *2。消费者设置id,创建持久消费者,依然要先运行一下,相当于注册一下,其后是否一直运行无所谓  *3.只要发了持久化消息,再次运行这种方式创建的消费者的时候就能收到消息了(重启mq后控制台可能看不到持久化消息,但是运行时还是  *能消费到的,应该在kahadb里面存着)  */ public class TopicConsumer1 { public static void main(String[] args) { ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); try { Connection connection = factory.createConnection(); connection.setClientID("consumer1"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic createTopic = session.createTopic("firstTopic"); TopicSubscriber topicSubscriber= session.createDurableSubscriber(createTopic, "subscriber1"); JiMiMessageLisenerTopic1 jiMiMessageLisener = new JiMiMessageLisenerTopic1(); topicSubscriber.setMessageListener(jiMiMessageLisener);//注册消息监听器 } catch (JMSException e) { e.printStackTrace(); } } } class JiMiMessageLisenerTopic1 implements MessageListener{ @Override public void onMessage(Message arg0) { TextMessage message = (TextMessage)arg0; try { System.out.println("消费者1订阅到消息:"+message.getText()); } catch (JMSException e) { e.printStackTrace(); } } }

发布订阅模式的消费者2的代码,其实跟消费者1的代码一样,只是写2遍罢了,但是我这里为了尽可能多说点呢,用的不再是javax.jms.TopicSubscriber这个类来做消费者,而是javax.jms.MessageConsumer这个类,其实都一样因为javax.jms.TopicSubscriber是继承自javax.jms.MessageConsumer这个接口的:

package cn.jms.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  * @author zhao  *  */ public class TopicConsumer2 { public static void main(String[] args) { ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); try { Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic createTopic = session.createTopic("firstTopic"); MessageConsumer consumer=session.createConsumer(createTopic); JiMiMessageLisenerTopic2 jiMiMessageLisener = new JiMiMessageLisenerTopic2(); consumer.setMessageListener(jiMiMessageLisener);//注册消息监听器 } catch (JMSException e) { e.printStackTrace(); } } } class JiMiMessageLisenerTopic2 implements MessageListener{ @Override public void onMessage(Message arg0) { TextMessage message = (TextMessage)arg0; try { System.out.println("消费者2订阅到消息:"+message.getText()); } catch (JMSException e) { e.printStackTrace(); } } }

注意,一定要先运行消费者,再运行生产者,不然是看不到发布/订阅的效果的:

image.png

发布/订阅模型的特点:

(1)、一个生产者发布的消息可以给多个订阅者来消费(上例中的TopicConsumer1和TopicConsumer2)

(2)、发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态才能接收消息。

今天的文章
jms消息队列协议以及activemq的使用分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/80510.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注