rabbitMQ——生产者(teturn监听连接),消费者(teturn监听连接)

rabbitMQ——生产者(teturn监听连接),消费者(teturn监听连接)1 生产者 packagenet stxy one listener returnlisten importjava io IOException importjava util concurrent TimeoutExcep importcom rabbitmq client AMQP importcom rabbitmq client Channel im rabbitmq 生产者监听消费

1.生产者


package net.stxy.one.listener.returnlistener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;

public class Producer {
    /
     * 消息回调监听
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建连接通道
        Channel channel = connection.createChannel();
        
        //4.指定消息投递模式:投递成功可监听
        //channel.confirmSelect();
        //声明
        String exchangeName = "test_teturn_exchange";
        String routingKey = "return.save";
        String routingKeyError = "abc.save";
        //5.发送消息
        String msg ="露露,return监听,我喜欢你!";
        
        
        //6.添加一个return舰艇
        channel.addReturnListener(new ReturnListener() {
            
            @Override
            public void handleReturn(int replyCode,//响应码
                    String replyText,//文本
                    String exchange,//交换机
                    String routingKey,//路由
                    AMQP.BasicProperties properties,//设置
                    byte[] body)//消息体
                    throws IOException {
                System.out.println("-------handleReturn--------");
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        });
        
        channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());//交换机名称,路由,mandatory:true时路由不到不会删除,且可以被return监听,属性,消息
        //channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());//交换机名称,路由,mandatory,属性,消息
        
    }
    
}
 


2.消费者


package net.stxy.one.listener.returnlistener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        //1.创建连接工厂,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建连接通道
        Channel channel = connection.createChannel();
        
        //4.指定消息投递模式:投递成功可监听
        //channel.confirmSelect();
        //声明
        String exchangeName = "test_teturn_exchange";
        String routingKey = "return.#";
        String routingKeyError = "abc.save";

        String queueName = "test_return_queue";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclareNoWait(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer queueingConsumer= new QueueingConsumer(channel);//创建消费者
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
    
}
 





 

今天的文章 rabbitMQ——生产者(teturn监听连接),消费者(teturn监听连接)分享到此就结束了,感谢您的阅读。
编程小号
上一篇 2024-10-20 17:11
下一篇 2024-10-20 17:06

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/3974.html