官方案例:https://www.rabbitmq.com/getstarted.html
本篇代码:https://gitee.com/touwowo/RabbitMQ
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最 主流的消息中间件之一。 优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高。
拉取镜像
docker pull rabbitmq
运行
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
安装管理界面的插件
# 查看运行的容器
docker ps
# 进入容器
docker exec -it 镜像ID /bin/bash
# 打开rabbitmq_management的插件
rabbitmq-plugins enable rabbitmq_management
# 退出容器
exit
添加自己的账户(也可以直接使用 guest/guest)
# 进入容器
docker exec -it 镜像ID /bin/bash
# 创建用户 admin为账号 123为密码
rabbitmqctl add_user admin 123
# 设置用户角色 admin为用户名
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 退出
exit
创建完成后就可以使用自定义的用户名和密码进行登录。
我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。我们将介绍 Java API 中的一些细节。
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区
创建maven工程,这里就不再演示了。
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
<!--IO-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>
// com/touwowo/one/Producer.java
package com.touwowo.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建连接工厂,设置HOST,username、password信息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
// 获取连接
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 声明队列
* 参数解释:
* 1、队列名称
* 2、队列里面的消息是否进行持久化(保存到磁盘) 默认把消息存在内存中
* 3、该队列是否只供一个消费者进行消费 true表示可以多个消费者消费,false表示只能一个消费者消费
一般为false,保证能够共享
* 4、是否自动删除 在最后一个消费者断开连接后,该队列是否自动删除
* 5、其他参数,后面学习会用到
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 发布消息
*/
String message = "Hello World!";
/**
* 1、发送消息到哪个交换机,""表示默认交换机
* 2、路由的key ,本次是队列名
* 3、其他参数信息
* 4、发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// com/touwowo/one/Consumer.java
package com.touwowo.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建连接工厂,设置HOST,username、password信息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
/**
* 定义消费成功回调函数,RabbitMQ异步推送消息,以对象的形式返回给回调
* 因为 DeliverCallback 是函数式接口,直接使用Lambda表达式进行简化
*/
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String(message.getBody()));
};
/**
* 定义未成功消费的回调函数
*/
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费失败。");
};
// 获取连接
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
// 获取指定队列的消息,最简单形式
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
哎哎哎,有人就要问了,在程序结束时咋不关闭连接呢?比如channel.close()
和connection.close()
。
这是因为希望进程在消费者异步侦听消息到达时保持活动状态。通俗点,如果关闭了,就相当于断开与RabbiMQ的连接,就不能进行 “消费”。
先运行Producer.java
,声明队列并发送消息。注意:必须先声明队列!!
再运行Consumer.java
,获取队列中的信息。
结果如下:
Hello World!
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
如上所示,存在一个生产者以及两个消费者,两个消费者同时消费同一个队列的消息。
咱们抽取一个工具类哈,直接返回Channel
。
// com/touwowo/utils/RabbitMqUtils.java
package com.touwowo.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
return connection.createChannel();
}
}
创建生产者,通过向控制台输入消息,达到向队列发送消息的目的。
// com/touwowo/two/Task.java
package com.touwowo.two;
import com.rabbitmq.client.Channel;
import com.touwowo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 详细解释请看 Hello World 板块
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
}
}
}
消费者代码:
// com/touwowo/two/Worker.java
package com.touwowo.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class Worker {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到消息:"+new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消消费"+consumerTag);
};
// 修改这的内容
System.out.println("工作线程 1 正在运行..........");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
因为存在两个消费者,所以复制以上代码,修改标记的内容。得到第二个消费者
先运行Task.java
,创建队列hello
,然后运行两个工作线程,达到上面架构图的那种效果。进行测试。
Task.java输入
1
2
3
4
消费者 1
工作线程 1 正在运行..........
收到消息:1
收到消息:3
消费者 2
工作线程 2 正在运行..........
收到消息:2
收到消息:4
很明显,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。
还有一个问题,我们怎么确定消费者是否消费成功?如果中途消费者停掉,那这条消息该怎么处理呢?
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
修改消费者代码,让消费者 2 睡眠 30s,模拟处理数据的过程
// com/touwowo/two/Worker.java
package com.touwowo.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class Worker {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
SleepUtils.sleep(30); //代码如下,睡眠30s
System.out.println("收到消息:"+new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消消费"+consumerTag);
};
System.out.println("工作线程 2 正在运行..........");
// 第二个参数就是 是否开启自动应答,true表示开启,false表示关闭
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
SleepUtils.java
package com.touwowo.utils;
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
启动生产者代码和消费者代码,在消费者2
处理消息的时候,停止程序。
Task.java输入
1
2 //输入后在30s内终止消费者2
消费者 1
工作线程 1 正在运行..........
收到消息:1
收到消息:2
消费者 2
工作线程 2 正在运行..........
当消费者2处理消息中断时,会将未成功消费的消息重新返回队列,让下一个消费者进行处理。
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上进行修改。如下所示。
package com.touwowo.three;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
import com.touwowo.utils.SleepUtils;
public class Worker {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("工作线程 2 已经启动:");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到的消息:"+new String(message.getBody())+" consumerTag:"+consumerTag);
/*
*参数一:用于消息确认的标记
*参数二:是否批量应答
* 这里我们不采用批量应答,处理完成一条消息就确认一条
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {
System.out.println("消费者消费取消。");
});
}
}
如果需要批量应答,可以设置multiple=false
,就是将basicAck
的第二个参数设置为true
。
手动应答的好处是可以批量应答并且减少网络拥堵。在一定程度上可以提高消息处理消息的效率。
刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
将第二个参数设置为true
,意味着 RabbitMQ 会将队列进行持久化,如果 RabbitMQ 在意外情况下关闭,可保证队列不丢失,但是如果其中的消息不进行持久化,队列中的消息依然会丢失!
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, //消息持久化
message.getBytes(StandardCharsets.UTF_8)
);
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。
在上面的测试中,队列采用 “轮询” 的方式将消息发放给消费者,但是这消费者处理消息的速度有快有慢,如果采用以前的方式进行消费,这势必会影响处理消息的速度,所以可以通过设置prefetch
来达到不公平分发的效果。
channel.basicQos(2); // 设置预取值为2,定义通道上允许的未确认消息的最大数量
相当于确定了一个缓冲空间,它的大小为 2 ,但其中未确认的消息的达到2条时,就不再向它分发消息。
注意:这是未确认
的消息!也就是没有应答的消息。
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
交换机常用几种类型:
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
对以前的知识进行补充。
以上的程序都没有使用交换机,空字符串表示无名交换机或者默认交换机,生产者可以直接通过指定routingKey
将消息发送到队列
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); //声明队列时使用无名交换机
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); //消费端直接指定消费队列
channel.exchangeDeclare("logs", "fanout");
faonout 交换机非常简单。正如您可能从名称中猜到的那样,它只是将它收到的所有消息广播到它知道的所有队列。
每次测试时,我们都声明队列,这很麻烦,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
在 Java 客户端中,当我们不向 queueDeclare()
提供参数时,我们会创建一个具有生成名称的非持久、独占、自动删除队列:
String queueName = channel.queueDeclare().getQueue();
此时 queueName
包含一个随机队列名称。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg
我们已经创建了一个 fanout 交换和一个队列。现在我们需要告诉交换器向我们的队列发送消息。交换和队列之间的这种关系称为绑定。
绑定的方法如下所示:
channel.queueBind(queueName, "logs", "")
发出日志消息的生产者程序与之前的教程看起来没有太大区别。最重要的变化是我们现在想要将消息发布到我们的日志交换而不是无名的交换。我们需要在发送时提供一个 routingKey,但它的值在 fanout 交换时被忽略。
生产者:
package com.touwowo.five;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.touwowo.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class EmitLog {
//申明交换机名称
private static final String EXCHANGE_NAME = "log";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机类型为 fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes(StandardCharsets.UTF_8));
}
}
}
消费者:
package com.touwowo.five;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class ReceiveLogs1 {
//申明交换机名称
private static final String EXCHANGE_NAME = "log";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
/*
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列到交换机
channel.queueBind(queue,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("工作线程 1 收到的消息为:"+new String(message.getBody()));
};
channel.basicConsume(queue,true,deliverCallback,(consumerTag -> {}));
}
}
无论我们创建多少个消费者,只要临时队列与 fanout 交换机绑定,消费者都可以收到生产者发送的消息,这也可以理解为一个广播,可以向很多个队列同时发送消息。
绑定可以采用额外的 routingKey 参数。为了避免与 basic_publish 参数混淆,我们将称其为绑定键。这是我们如何使用键创建绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
如black
就是 routingKey
,可以把它当做一个路由,交换机通过生产者指定的 routingKey
向指定的队列发送消息,这就是与 fanout 交换机的不同之处。
在这个设置中,我们可以看到绑定了两个队列的直接交换机 X
。第一个队列与绑定键 orange
绑定,第二个队列有两个绑定,一个绑定键为 black
,另一个为 green
。
在这样的设置中,使用路由键 orange
发布到交换机的消息将被路由到队列 Q1。路由键为 black
或 green
的消息将转到 Q2。所有其他消息将被丢弃。
使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键 black
在 X
和 Q1
之间添加一个绑定。在这种情况下,直接交换的行为类似于扇出,并将消息广播到所有匹配的队列。路由键为 black
的消息将同时发送到 Q1
和 Q2
。
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
这次我们声明的交换机的类型为direct
,目的是生产者通过指定的routingKey
向指定的队列发送消息。
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","错误 error 信息");
bindingKeyMap.put("debug","调试 debug 信息");
通过以上的测试数据,观察结果。验证结果是否能达到预期。
package com.touwowo.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.touwowo.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class EmitLogDirect {
//申明交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机,类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 测试数据
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","错误 error 信息");
bindingKeyMap.put("debug","调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
//通过指定的 routingKey 向队列发送消息
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes(StandardCharsets.UTF_8));
}
}
}
package com.touwowo.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class ReceiveLogsDirect01 {
//申明交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 获取临时队列
String queue = channel.queueDeclare().getQueue();
// 声明直接交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 将队列与交换机进行绑定 , routingKey为 info 和 warning
channel.queueBind(queue,EXCHANGE_NAME,"info");
channel.queueBind(queue,EXCHANGE_NAME,"warning");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("收到的消息为:"+new String(message.getBody()));
};
channel.basicConsume(queue,false,callback,consumerTag -> {});
}
}
package com.touwowo.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class ReceiveLogsDirect02 {
//申明交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queue = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(queue,EXCHANGE_NAME,"error");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("收到的消息为:"+new String(message.getBody()));
};
channel.basicConsume(queue,false,callback,consumerTag -> {});
}
}
发送到 Topic 交换的消息不能具有任意的 routing_key - 它必须是一个单词列表,由点分隔。这些词可以是任何东西,但通常它们会指定一些与消息相关的特征。一些有效的路由键示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。路由键中可以有任意多个字,最多 255 个字节。
绑定 routingKey 也必须采用相同的形式。Topic 交换背后的逻辑类似于 Direct 交换 - 使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。然而,绑定键有两个重要的特殊情况:
*
可以代替一个词。#
可以替换零个或多个单词。如上图所示:
Q1–>绑定的是 :
中间带 orange 带 3 个单词的字符串(.orange.)
Q2–>绑定的是 :
最后一个单词是 rabbit 的 3 个单词(..rabbit)
第一个单词是 lazy 的多个单词(lazy.#)
package com.touwowo.seven;
import com.rabbitmq.client.Channel;
import com.touwowo.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> entry : bindingKeyMap.entrySet()) {
String routingKey = entry.getKey();
String message = entry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("成功发送消息:"+message);
}
}
}
如果对前面代码很熟悉的话,应该不用解释。
package com.touwowo.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机类型为 Topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q1",false,false,false,null);
// 将队列与交换机进行绑定
channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("Q1收到的消息为:"+new String(message.getBody()));
};
channel.basicConsume("Q1",true,callback,consumerTag -> {});
}
}
package com.touwowo.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.touwowo.utils.RabbitMqUtils;
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q2",false,false,false,null);
channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("Q2收到的消息为:"+new String(message.getBody()));
};
channel.basicConsume("Q2",true,callback,consumerTag -> {});
}
}
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
开启发布确认模式:
channel.confirmSelect();
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。 这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
public static void confirmMessageSingal() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置发布确认
channel.confirmSelect();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_NUM; i++) {
String message = i + "";
channel.basicPublish("",queueName, null,message.getBytes(StandardCharsets.UTF_8));
// 每发一条消息,就等待确认,超时时间为 5s
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("单个确认发布 " + MESSAGE_NUM + " 条消息的耗时为 "+ (end - start) +" ms");
}
上述程序测试发布1000条消息所需要的时间,因为生产者必须接收到 broker 的确认才会发布下一个消息,这是非常耗费时间,效率低下,一般不采用这种方式保证消息的持久化。
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
public static void confirmMessageMuti() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置发布确认
channel.confirmSelect();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_NUM; i++) {
String message = i + "";
channel.basicPublish("",queueName, null,message.getBytes(StandardCharsets.UTF_8));
// 一百条确认一次
if((i+1)%100 == 0){
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("批量确认发布 " + MESSAGE_NUM + " 条消息的耗时为 "+ (end - start) +" ms");
}
与等待单个消息的确认相比,等待一批消息得到确认大大提高了吞吐量(使用远程 RabbitMQ 节点最多 20-30 次)。一个缺点是我们不知道在失败的情况下到底出了什么问题,所以我们可能不得不在内存中保留一整批来记录一些有意义的东西或重新发布消息。而且这个方案还是同步的,所以会阻塞消息的发布。
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。
代码实现:
public static void confirmMessageAysnc() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置发布确认
channel.confirmSelect();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
ConcurrentSkipListMap<Long,String> outstandmMap = new ConcurrentSkipListMap<>();
/**
* deliveryTag:标识已确认或已确认消息的编号。我们很快就会看到如何将它与发布的消息相关联。
* multiple:这是一个布尔值。如果为 false,则仅 nack 一条消息,如果为 true,则 nack 所有具有较小或相等序列号的消息。
*/
//确认消息后的回调函数
ConfirmCallback confirmCallback = (deliveryTag,multiple) ->{
if(multiple){
ConcurrentNavigableMap<Long, String> confirms = outstandmMap.headMap(deliveryTag,true);
confirms.clear();
}else {
outstandmMap.remove(deliveryTag);
}
System.out.println("收到消息的内容:"+deliveryTag);
};
// 发布消息失败或者拒绝消息
ConfirmCallback cancelCallback = (deliveryTag,multiple) ->{
String s = outstandmMap.get(deliveryTag);
System.out.println("未确认消息的编号:"+s);
};
channel.addConfirmListener(confirmCallback,cancelCallback);
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_NUM; i++) {
String message = i + "";
//发布前可以通过Channel.getNextPublishSeqNo()获取序列号
long nextPublishSeqNo = channel.getNextPublishSeqNo();
outstandmMap.put(nextPublishSeqNo,message);
channel.basicPublish("",queueName, null,message.getBytes(StandardCharsets.UTF_8));
}
long end = System.currentTimeMillis();
System.out.println("异步确认发布 " + MESSAGE_NUM + " 条消息的耗时为 "+ (end - start) +" ms");
}
这个难搞哦,也不知道怎么说,将就看看吧。