原创
RabbitMQ消息处理
温馨提示:
本文最后更新于 2019年03月21日,已超过 2,002 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保证消息可靠性的呢——消息持久化。
RabbitMQ的消息持久化处理
生产者
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
mq.config.exchange=log.direct
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error.routing.key=log.error.routing.key
mq.config.queue.error=log.error
Sender
package com.lzhpo.rabbitmq.rabbitmqdurabledirectprovider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送者
* @author Administrator
*
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交换器名称
@Value("${mq.config.exchange}")
private String exchange;
//routingkey 路由键
@Value("${mq.config.queue.error.routing.key}")
private String routingkey;
/*
* 发送消息的方法
*/
public void send(String msg){
//向消息队列发送消息
//参数一:交换器名称。
//参数二:路由键
//参数三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
}
}
RabbitmqDurableDirectProviderApplicationTests测试类
package com.lzhpo.rabbitmq.rabbitmqdurabledirectprovider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqDurableDirectProviderApplicationTests {
@Autowired
private Sender sender;
/**
* 测试消息队列
*/
@Test
public void contextLoads() throws Exception {
int flag = 0;
while(true){
flag++;
Thread.sleep(2000);
this.sender.send("Hello RabbitMQ "+flag);
}
}
}
消费者
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
mq.config.exchange=log.direct
mq.config.queue.info=log.info
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error=log.error
mq.config.queue.error.routing.key=log.error.routing.key
ErrorReceiver
package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息接收者
* @author Administrator
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
*
* @Queue value:配置队列名称
* autoDelete:是否是一个可删除的临时队列
*
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),
exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
key="${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
/**
* 接收消息的方法。采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("Error..........receiver: "+msg);
}
}
InfoReceiver
package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息接收者
* @author Administrator
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
*
* @Queue value:配置队列名称
* autoDelete:是否是一个可删除的临时队列
*
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
key="${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收消息的方法。采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("Info........receiver: "+msg);
}
}
Main
package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDurableDirectConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDurableDirectConsumerApplication.class, args);
}
}
测试结果
先启动消费者,再运行RabbitmqDurableDirectProviderApplicationTests
测试类。
RabbitMQ中的消息确认ACK机制
什么是消息确认ACK?
如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那可能这条正常在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确认ACK。
ACK的消息确认机制?
ACK机制是消费者从RabbitMQ手到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
- 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
- 如果在集群的环境下:RabbitMQ会立即将这个消息推送给这个在线的其它消费者。这种机制保证了再消费者服务端故障的时候,不会丢失任何消息和任务。
- 消息永远不会从RabbitMQ中删除:只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
- 消息的ACK确认机制默认是打开的。
ACK机制的开发注意事项?
如果忘记了ACK,那么后果很严重。当Consumer退出时,Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。
修改 Consusmer 配置文件解决 ACK 反馈问题。
在application.properties
配置文件中添加以下
############Rabbitmq的消息确认ACK机制############
#开启重试
spring.rabbitmq.listener.retry.enabled=true
#重试次数,默认为 3 次
spring.rabbitmq.listener.retry.max-attempts=5
- 本文标签: RabbitMQ
- 本文链接: http://www.lzhpo.com/article/8
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权