> 文章列表 > Rabbitmq死信交换机

Rabbitmq死信交换机

Rabbitmq死信交换机

正常的Rabbitmq流程是生产者把消息先到交换机,交换机分发到队列,然后消费者从队列中取出消息

死信交换机就是给消息设置过期时间TTL,然后将正常的队列绑定死信交换机,死信交换机绑定一个新的专门承接死信消息的队列,以待取出

生产者

private static String NORMAL_EXCHANGE_NAME = "normal_exchange_ttl";
 
    //生产者
    public static void main(String[] args) throws Exception {
        //1、获取connection
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();
        for (int i = 0; i < 5; i++) {
            sendMsg(channel);
        }
        //4、关闭管道和连接
        channel.close();
        connection.close();
    }
 
    private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        // 1. 设置消息 TTL 过期时间
        // 2. 设置 队列 TTL 过期时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();
        String message = "info";
        channel.basicPublish(NORMAL_EXCHANGE_NAME, "normal-key", properties, message.getBytes());
        System.out.println("消息发送完成:" + message);
    }
 

private static String NORMAL_EXCHANGE_NAME = "normal_exchange_ttl";
    private static String NORMAL_QUEUE_NAME = "normal_queue_ttl";
    private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static String DEAD_QUEUE_NAME = "dead-queue";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();
        //3. 创建死信队列与交换机及绑定关系
        handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
 
        // 正常队列与死信交换机的绑定关系
        Map<String, Object> deadLetterParams = getNormalAndDeadParams();
 
        // 4.声明一个正常队列与交换机及绑定关系
        handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
 
        channel.basicQos(1);
 
        //5.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Normal消费者接收消息: " + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
 
        channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
 
        System.out.println("Normal消费者启动接收消息......");
 
        //5、键盘录入,让程序不结束!
        System.in.read();
 
        //6、释放资源
        channel.close();
        connection.close();
 
    }
 
    private static Map<String, Object> getNormalAndDeadParams() {
        Map<String, Object> deadLetterParams = new HashMap<>();
        deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
        //队列过期时间限制
        //deadLetterParams.put("x-message-ttl", 1000);
        return deadLetterParams;
    }
 
    /**
     * 处理队列与绑定关系
     *
     * @param channel
     * @param deadQueueName
     * @param o
     * @param deadExchangeName
     * @param routingKey
     * @throws IOException
     */
    private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map<String, Object> o, String deadExchangeName, String routingKey) throws IOException {
        // 声明一个队列
        channel.queueDeclare(deadQueueName, false, false, false, o);
        // 声明一个交换机
        channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
        // 队列与交换机绑定
        channel.queueBind(deadQueueName, deadExchangeName, routingKey);
    }