1.生产者
package net.stxy.one.listener.returnlistener;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
public class Producer {
/
* 消息回调监听
* @param args
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂,并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.创建连接通道
Channel channel = connection.createChannel();
//4.指定消息投递模式:投递成功可监听
//channel.confirmSelect();
//声明
String exchangeName = "test_teturn_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
//5.发送消息
String msg ="露露,return监听,我喜欢你!";
//6.添加一个return舰艇
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode,//响应码
String replyText,//文本
String exchange,//交换机
String routingKey,//路由
AMQP.BasicProperties properties,//设置
byte[] body)//消息体
throws IOException {
System.out.println("-------handleReturn--------");
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
});
channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());//交换机名称,路由,mandatory:true时路由不到不会删除,且可以被return监听,属性,消息
//channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());//交换机名称,路由,mandatory,属性,消息
}
}
2.消费者
package net.stxy.one.listener.returnlistener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//1.创建连接工厂,并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.创建连接通道
Channel channel = connection.createChannel();
//4.指定消息投递模式:投递成功可监听
//channel.confirmSelect();
//声明
String exchangeName = "test_teturn_exchange";
String routingKey = "return.#";
String routingKeyError = "abc.save";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclareNoWait(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer= new QueueingConsumer(channel);//创建消费者
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
今天的文章 rabbitMQ——生产者(teturn监听连接),消费者(teturn监听连接)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/3974.html