原创

RabbitMQ消息处理

消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保证消息可靠性的呢——消息持久化。

RabbitMQ的消息持久化处理

生产者

application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

mq.config.exchange=log.direct

mq.config.queue.info.routing.key=log.info.routing.key

mq.config.queue.error.routing.key=log.error.routing.key

mq.config.queue.error=log.error
Sender
package com.lzhpo.rabbitmq.rabbitmqdurabledirectprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 消息发送者
 * @author Administrator
 *
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    //exchange 交换器名称
    @Value("${mq.config.exchange}")
    private String exchange;

    //routingkey 路由键
    @Value("${mq.config.queue.error.routing.key}")
    private String routingkey;
    /*
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一:交换器名称。
        //参数二:路由键
        //参数三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
    }
}
RabbitmqDurableDirectProviderApplicationTests测试类
package com.lzhpo.rabbitmq.rabbitmqdurabledirectprovider;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqDurableDirectProviderApplicationTests {

    @Autowired
    private Sender sender;

    /**
     * 测试消息队列
     */
    @Test
    public void contextLoads() throws Exception {
        int flag = 0;
        while(true){
            flag++;
            Thread.sleep(2000);
            this.sender.send("Hello RabbitMQ "+flag);
        }
    }

}

消费者

application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

mq.config.exchange=log.direct

mq.config.queue.info=log.info

mq.config.queue.info.routing.key=log.info.routing.key

mq.config.queue.error=log.error

mq.config.queue.error.routing.key=log.error.routing.key
ErrorReceiver
package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接收者
 * @author Administrator
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 * 
 * @Queue value:配置队列名称
 *        autoDelete:是否是一个可删除的临时队列
 * 
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
                    key="${mq.config.queue.error.routing.key}"
            )
        )
public class ErrorReceiver {

    /**
     * 接收消息的方法。采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("Error..........receiver: "+msg);
    }
}
InfoReceiver
package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接收者
 * @author Administrator
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 * 
 * @Queue value:配置队列名称
 *        autoDelete:是否是一个可删除的临时队列
 * 
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
                    key="${mq.config.queue.info.routing.key}"
            )
        )
public class InfoReceiver {

    /**
     * 接收消息的方法。采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("Info........receiver: "+msg);
    }
}
Main
package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqDurableDirectConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDurableDirectConsumerApplication.class, args);
    }

}
测试结果

先启动消费者,再运行RabbitmqDurableDirectProviderApplicationTests测试类。

RabbitMQ中的消息确认ACK机制

什么是消息确认ACK?

如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那可能这条正常在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确认ACK。

ACK的消息确认机制?

ACK机制是消费者从RabbitMQ手到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

  1. 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
  2. 如果在集群的环境下:RabbitMQ会立即将这个消息推送给这个在线的其它消费者。这种机制保证了再消费者服务端故障的时候,不会丢失任何消息和任务。
  3. 消息永远不会从RabbitMQ中删除:只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  4. 消息的ACK确认机制默认是打开的。

ACK机制的开发注意事项?

如果忘记了ACK,那么后果很严重。当Consumer退出时,Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。

修改 Consusmer 配置文件解决 ACK 反馈问题。

application.properties配置文件中添加以下

############Rabbitmq的消息确认ACK机制############
#开启重试
spring.rabbitmq.listener.retry.enabled=true
#重试次数,默认为 3 次
spring.rabbitmq.listener.retry.max-attempts=5
正文到此结束