一对一:
首先配置pom:
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
发送方:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class SentProuducter {
public static void main(String[] args)throws Exception {
//1 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2 获得链接
Connection connection = connectionFactory.createConnection();
//3 启动连接
connection.start();
//4 获取session 参数:是否启动事物 消息确认模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 创建队列对象
Queue queue = session.createQueue("test-sent");
//6 创建消息的生产者
MessageProducer producer = session.createProducer(queue);
//7 创建消息
TextMessage textMessage = session.createTextMessage("发送东风41到小日本");
//8 发送消息
producer.send(textMessage);
//9 关闭资源
producer.close();
session.close();
connection.close();
}
}
接收方:
public class ActiveConsumer {
public static void main(String[] args)throws Exception {
//1 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2 获得链接
Connection connection = connectionFactory.createConnection();
//3 启动连接
connection.start();
//4 获取session 参数:是否启动事物 消息确认模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 创建队列对象
Queue queue = session.createQueue("test-sent");
//6 创建消息的接收者
MessageConsumer consumer = session.createConsumer(queue);
//7 监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage message1 = (TextMessage) message;
try {
System.out.println(message1.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8 等待键盘输入
System.in.read();
//9 关闭资源
consumer.close();
session.close();
connection.close();
}
}
一对多:
发送方:
public class ProductsSent {
public static void main(String[] args)throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-one2two");
MessageProducer producer = session.createProducer(topic);
TextMessage textMessage = session.createTextMessage("祖国万岁,人民万岁");
producer.send(textMessage);
producer.close();
session.close();
connection.close();
}
}
接收方(可以是多个接收方 发送方唯一):
public class TopicConsumer {
public static void main(String[] args)throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-one2two");
//接收消息
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("我收到了"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
今天的文章消息中间件ActiveMq(代码实现)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/67127.html