原创
RabbitMQ六种消息模式
温馨提示:
本文最后更新于 2019年03月21日,已超过 2,002 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
pom依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
RabbitMQ的连接工具(我单独写出来了一个工具类,方便使用):
package com.lzhpo.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <p> Author:lzhpo </p>
* <p> Title:获取Rabbitmq的连接工具</p>
* <p> Description:</p>
*/
public class ConnectionUtils {
/**
* 获取Rabbitmq的连接
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("/");
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
return factory.newConnection();
}
}
简单队列
简单队列:一个生产者P发送消息到队列Q,一个消费者C接收。
生产者(Send)
package com.lzhpo.rabbitmq.model5.simplequeues;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p> Author:lzhpo </p>
* <p> Title:生产者生产消息</p>
* <p> Description:</p>
*/
public class Send {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//需要发送的消息
String msg = "hello simple!";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("---send msg:" +msg);
//关闭
channel.close();
connection.close();
}
}
消费者(Recv)
package com.lzhpo.rabbitmq.model5.simplequeues;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:消费者获取消息</p>
* <p> Description:</p>
*/
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
/**
* main()入口
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
//oldApi();//老版本api
newApi();//新版本api
}
/**
* 新版本api
*/
private static void newApi() throws Exception{
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("new api recv:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
/**
* 老版本api
* @throws Exception
*/
private static void oldApi() throws Exception{
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());
System.out.println("[recv] msg: " +msgString);
}
}
}
工作队列
轮询分发
【轮询分发】:结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分。
生产者(Send)
package com.lzhpo.rabbitmq.model5.workqueues.lunxun;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p> Author:lzhpo </p>
* <p> Title:【轮询分发】</p>
* <p> Description:
* 备注:消费者 1 我们处理时间是 1s ;而消费者 2 中处理时间是 2s;
* 但是我们看到的现象并不是 1 处理的多 消费者 2 处理的少。
* -----------------------------------
* 消费者1【Recv1】:
* [1] Received '.0'
* [x] Done
* [1] Received '.2'
* [x] Done
* ......
* [1] Received '.46'
* [x] Done
* [1] Received '.48'
* [x] Done
* 消费者 1 将偶数部分处理掉了
* -----------------------------------
* 消费者2【Recv2】:
* [2] Received '.1'
* [x] Done
* [2] Received '.3'
* [x] Done
* ......
* [2] Received '.47'
* [x] Done
* [2] Received '.49'
* [x] Done
* 消费者 2 中将奇数部分处理掉了。
* -----------------------------------
* 我想要的是 1 处理的多,而 2 处理的少
* 测试结果:
* 1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
* 2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
* 按道理消费者 1 获取的比消费者 2 要多
* -----------------------------------
* 这种方式叫做【轮询分发】:结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分。
* -----------------------------------
* </p>
*/
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
//消息内容
String message = "." +i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" +message +"'");
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.workqueues.lunxun;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义一个消息的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" [1] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; //消息的确认模式自动应答
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep(1000);
}
@SuppressWarnings("unused")
public static void oldAPi() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态false 自动true 自动应答 不需要手动确认
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.workqueues.lunxun;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义一个消息的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" [2] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; //
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
公平分发
使用公平分发,必须关闭自动应答,改为手动应答。
生产者(Send)
package com.lzhpo.rabbitmq.model5.workqueues.gongping;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p> Author:lzhpo </p>
* <p> Title:【公平分发】</p>
* <p> Description:
* 虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有 2 个消费者,所有的偶数的消息都是繁忙的,而
* 奇数则是轻松的。按照轮询的方式,偶数的任务交给了第一个消费者,所以一直在忙个不停。奇数的任务交给另一
* 个消费者,则立即完成任务,然后闲得不行。
*
* 而 RabbitMQ 则是不了解这些的。他是不知道你消费者的消费能力的,这是因为当消息进入队列,RabbitMQ 就会分派
* 消息。而 rabbitmq 只是盲目的将消息轮询的发给消费者。你一个我一个的这样发送.
*
* 为了解决这个问题,我们使用 basicQos( prefetchCount = 1)方法,来限制 RabbitMQ 只发不超过 1 条的消息给同
* 一个消费者。当消息处理完毕后,有了反馈 ack,才会进行第二次发送。(也就是说需要手动反馈给 Rabbitmq )
*
* 还有一点需要注意,使用【公平分发】,必须关闭自动应答,改为手动应答。
*
*
* 这时候现象就是消费者 1 速度大于消费者 2
* </p>
*/
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
//每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
//限制发给同一个消费者不得超过1条消息
channel.basicQos(prefetchCount);
// 发送的消息
for (int i = 0; i < 50; i++) {
String message = "." + i;
// 往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.workqueues.gongping;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//保证一次只分发一个
//定义一个消息的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" [1] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
/**手动应答**/
boolean autoAck = false; //手动确认消息
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep(1000);
}
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.workqueues.gongping;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:
* Message acknowledgment(消息应答):
*
* boolean autoAck = false;
* channel.basicConsume(QUEUE_NAME, autoAck, consumer);
*
* boolean autoAck = true;(自动确认模式)一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。
* 在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消
* 费者但尚未处理的消息。
*
* boolean autoAck = false; (手动确认模式) 我们不想丢失任何任务,如果有一个消费者挂掉了,那么
* 我们应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消
* 息应答。消费者发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ 可
* 以删除它了。
*
* 消息应答是默认打开的。也就是 boolean autoAck =false;
*
* </p>
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//保证一次只分发一个
//定义一个消息的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" [2] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);/**关闭自动确认应答,手动应答**/
}
}
};
/**关闭自动应答**/
boolean autoAck = false; //关闭自动确认
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep(2000);
}
public static void oldAPi() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);/**关闭自动确认应答,手动应答**/
}
}
}
消息订阅模式
【订阅模式】:一个消息被多个消费者消费。
- 一个生产者,多个消费者。
- 每一个消费者都有自己的队列。
- 生产者没有直接把消息发送到队列,而是发送到了交换机、转发器exchange。
- 每个队列都要绑定到交换机上。
- 生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费。
生产者(Send)
package com.lzhpo.rabbitmq.model5.subscribeModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:
* 先运行Send创建交换器
*
* 但是这个发送的消息到哪了呢? 消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的
* 能力.因为这时还没有队列,所以就会丢失;
* 小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!
*
* 【订阅模式】:一个消息被多个消费者消费。
* 1.一个生产者,多个消费者。
* 2.每一个消费者都有自己的队列。
* 3.生产者没有直接把消息发送到队列,而是发送到了交换机、转发器exchange
* 4.每个队列都要绑定到交换机上
* 5.生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费。
*
* 邮件->注册->短信
*
* </p>
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明exchange 交换机 转发器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //订阅模式
// 消息内容
String msg = "Hello PB";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("Send: " +msg);
channel.close();
connection.close();
}
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.subscribeModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_fanout_email";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 绑定队列到交换机 **/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//------------下面逻辑和work模式一样-----
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.subscribeModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_fanout_sms";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 绑定队列到交换机 **/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
路由模式
- 发送消息到交换机并且要指定路由key 。
- 消费者将队列绑定到交换机时需要指定路由key。
生产者(Send)
package com.lzhpo.rabbitmq.model5.routingModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:
* 先运行Send创建交换器
* </p>
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String msg = "hello direct!";
//routingKey
//String routingKey = "error";//error两个都可以收到
//String routingKey = "info";//info只有Recv2能收到
String routingKey = "warning";//warning只有Recv2能收到
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("-------------send: " +msg);
channel.close();
connection.close();
}
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.routingModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 绑定队列到交换机 **/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.routingModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 绑定队列到交换机 **/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
主题模式
Topic主题模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
生产者(Send)
package com.lzhpo.rabbitmq.model5.topicModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:
* Topic主题模式:将路由键和某种模式匹配。
* </p>
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "id=1001";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.topicModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.topicModel;
import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 绑定队列到交换机 **/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");//全匹配:item.#
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
RPC远程调用模式
前面学习了如何使用work队列在多个worker之间分配任务,但是如果需要在远程机器上运行个函数并等待结果,就需要使用RPC(远程过程调用)模式来实现。
参考官网教程【模拟RPC服务来返回斐波那契数列】:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
- 本文标签: RabbitMQ
- 本文链接: http://www.lzhpo.com/article/9
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权