学习目标
MQ简介
在计算机科学中,消息队列(英语: Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系
列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的
种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
实现
消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。
目前,有很多消息队列有很多开源的实现,包括JBoss Messaging,JORAM、Apache ActiveMQ ,SunOpen Message Queue . IBM MQ、Apache Qpid和HTTPSQS。
当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,而部分数据库如Redis、Mysql以及
phxsql 也可实现消息队列的功能。
特点
MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和
JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
注意:
- AMQP,即Advanced Message Queuing Protocol,一个提供统—消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息客户端/中间件不同产品,不同的开发语言等条件的限制。
- JMS,Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分都实现了JMSAPI,如ActiveMQ,Redis 以及RabbitMQ等。
优缺点
优点
应用耦合、异步处理、流量削锋
- 解耦
传统模式:
传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入系统A还
需要修改代码,过于麻烦!
中间件模式:
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
-
异步
传统模式:
传统模式的缺点:
一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
-
削峰
传统模式:
传统模式的缺点:
并发星大的时候,所有的请求直接怼到数据库,造成数据库连接异常
中间件模式:
中间件模式的的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
缺点
系统可用性降低、系统复杂性增加
使用场景
消息队列,是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时
候,差不多就是需要使用消息队列的时候
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
为什么使用RabbitMQ
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: Python ,Ruby、.NET, Java,JMS、C,PHP、
ActionScript, XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
总结如下:
- 基于AMQP协议
- 高并发(是一个容量的概念,服务器可以接受的最大任务数量)
- 高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)
- 高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)
- 强大的社区支持,以及很多公司都在使用
- 支持插件
- 支持多语言
安装
安装依赖
#安装依赖项
yum install -y gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git
erlang安装
erlang与rabbitmq 版本对应 参见 官网 https://www.rabbitmq.com/which-erlang.html
需要下载合适版本的Erlang。
Erlang下载地址:https://github.com/rabbitmq/erlang-rpm/releases
下载:
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.3/erlang-23.3.3-1.el8.x86_64.rpm
安装:
yum install erlang-23.3.3-1.el8.x86_64.rpm
安装RabbitMQ
3.8.16:
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.16
下载:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16-1.el8.noarch.rpm
安装:
yum install rabbitmq-server-3.8.16-1.el8.noarch.rpm
启动
命令行窗口启动
#启动:
rabbitmq-server start
#停止
rabbitmq-server stop
推荐(后台启动):
#启动:
systemctl start rabbitmq-server.service
#停止:
systemctl stop rabbitmq-server.service
#查看状态:
systemctl status rabbitmq-server.service
#重启服务
systemctl restart rabbitmq-server
#设置为开机启动
systemctl enable rabbitmq-server
启动Web 管理插件
rabbitmq-plugins enable rabbitmq_management
添加防火墙端口
注意如果服务器是阿里云腾讯云,可能没有放行端口,需要自己放行端口
默认的端口15672:rabbitmq管理平台端口号
默认的端口5672: rabbitmq消息中间内部通讯的端口
默认的端口号25672 rabbitmq集群的端口号
#添加端口
firewall-cmd --add-port=5672/tcp --permanent
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --add-port=25672/tcp --permanent
#重新加载配置
firewall-cmd --reload
# 防火墙启动关闭
systemctl start firewalld.service
systemctl stop firewalld.service
systemctl enable firewalld.service
# 查看防火墙端口列表
firewall-cmd --permanent --list-port
web管理插件访问地址
自己的服务器ip
默认用户名:guest
默认密码:guest
发现登陆不了,因为默认用户只能在本地登陆
添加用户
#添加用户
rabbitmqctl add_user username password
#设置admin权限
rabbitmqctl set_user_tags username administrator
登陆
使用新建用户登录http://ip:15672/即可
常用命令
# 添加用户
sudo rabbitmqctl add_user <username> <password>
# 删除用户
sudo rabbitmqctl delete_user <username>
# 修改用户密码
sudo rabbitmqctl change_password <username> <newpassword>
# 清除用户密码(该用户将不能使用密码登陆,但是可以通过SASL登陆如果配置了SASL认证)
sudo rabbitmqctl clear_password <username>
# 设置用户tags(相当于角色,包含administrator,monitoring,policymaker,management)
sudo rabbitmqctl set_user_tags <username> <tag>
# 列出所有用户
sudo rabbitmqctl list_users
# 创建一个vhosts
sudo rabbitmqctl add_vhost <vhostpath>
# 删除一个vhosts
sudo rabbitmqctl delete_vhost <vhostpath>
# 列出vhosts
sudo rabbitmqctl list_vhosts [<vhostinfoitem> ...]
# 针对一个vhosts给用户赋予相关权限;
sudo rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 清除一个用户对vhosts的权限;
sudo rabbitmqctl clear_permissions [-p <vhostpath>] <username>
# 列出哪些用户可以访问该vhosts;
sudo rabbitmqctl list_permissions [-p <vhostpath>]
# 列出用户访问权限;
sudo rabbitmqctl list_user_permissions <username>
web管理
功能说明
RabbitMQ专业术语
connection是什么,与TCP连接有什么关系?
- connection代表着一条真实的TCP连接,即物理连接,具有ip和port
- 该连接与RabbitMq服务器相连,大部分场景下设计时采用长连接
- 可以配置心跳来检测连接状态
- connection是线程安全的
- 创建connection的代价相对较高(其实也不高),且当connection较多时,RabbitMq服务的负荷会变高
- 使用完毕需要关闭
channel是什么,为什么需要使用它?
- 可以把它看作是虚拟的连接,这里的虚拟是相对于connection的真实连接而言
- 该虚拟连接处于一个connection的内部,可以把它看作是connection上的session
- 在channel内进行真实的数据传输,所有的RabbitMq指令都是在channel内进行的
- 之所以使用channel,是因为创建真实的TCP连接开销较大,且真实连接太多时会增大RabbitMq服务的负荷
- channel不是线程安全的,一般每个线程维护自己的channel
- channel设计时就是短暂的,一旦该channel上出现错误,就会关闭,后续操作也不会成功
- 使用完毕需要关闭
channel和connection之间到底是什么关系?
- 在应用中,先创建connection,然后在该connection上创建channel,在channel上传输数据
- 可以在一个connection内创建多个channel,协议支持最大值为65535,推荐默认值为2047
- 其它内容可以参考对channel的解释
Exchange是什么?它是如何工作的?
- 生产者发送的消息全部都发到Exchange,而非直接到queue
- 负责从生产者接收消息,然后通过路由规则把消息路由到相应的queue
- 可以使用默认的Exchange,这里生产者和消费者共享一个同名的queue来传递消息
- 创建Exchange时指定名称和类型,类型通常是:dircet, topic, header, fanout,类型决定了路由规则
- 生产者把消息publish到指定的Exchange,并指定routing_key,路由规则根据routing_key进行路由
- 消费者创建一个queue,并使用routing_key把queue绑定到Exchange,以接收从Exchange路由到该queue的消息
queue是什么?
- 存放消息的队列,消息的传递必须进入到某个queue,且消息在queue中被消费
- 消费者创建queue,并把queue绑定到Exchange,依据绑定的routing_key接收消息
- 可以创建匿名的queue,由RabbitMq随机命名,使用完毕自动删除,queue命名时最长255字节(utf-8字符)
- 可以在创建queue时,设置消息持久化,这样RabbitMq重启后,queue依然存在
- Exclusive 属性决定了该queue只能被一个connection使用,且该connection关闭后自动删除queue
bind是什么?为什么要bind?
- 生产者发布的消息都通过Exchange路由,不会直接发送给消费者
- 消费者要想取得消息,必须把queue绑定到Exchange,这样Exchange才知道需要把消息路由到这个queue
- 绑定时需要指定Exchange、queue和routing_key
- bind是一种把queue和Exchange连接起来的方式
总体数据流是怎样的?
如图所示:
- 生产者发布消息到Exchange
- Exchange接收消息并负责路由消息
- 必须有queue已经binding到了Exchange,示例中Exchange根据路由规则把消息路由到两个queue(由Exchange的类型和routing_key决定)
- 消息会驻留在queue中,直到被消费者处理
- 最终消息被消费者消费
简单队列模式
在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列——RabbitMQ 代表消费者保留的消息缓冲区。
生产者将消息发送到“hello”队列。消费者从该队列接收消息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.aisencode</groupId>
<artifactId>rabbitmqtest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmqtest</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
</dependencies>
</project>
发送方
package cn.aisencode.test01_simple.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 简单队列-消息生产者
*/
public class Send {
//定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel()) {
/**
* 声明队列
* 第一个参数queue:队列名称
* 第二个参数durable:是否持久化
* 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
* 这里需要注意三点:
* 1.排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
* 2."首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
* 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,i该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
* 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
/**
* 发送消息
* 第一个参数:交换机(为空,使用默认交换机)
* 第二个参数:路由名(交换机为空,传入的队列名会转成路由名)
* 第三个参数:携带额外参数
* 第四个参数:消息的实体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
接收方
package cn.aisencode.test01_simple.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 简单队列-消息消费者
*/
public class Recv {
//定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
Work queue:工作队列模式
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们安排任务稍后完成。我
们将一个任务封装 成一条消息并发送到队列中。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工人时,任务将在他
们之间共享。
轮询模式
发送方
package cn.aisencode.test02_work.rr.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 工作队列-轮询-消息生产者
* 相比简单队列解决消息更快,一个消费者一条的消费
* 但受限于消费者的性能,若一台慢另一台仍需等待
*/
public class Send {
//定义队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel()) {
/**
* 声明队列
* 第一个参数queue:队列名称
* 第二个参数durable:是否持久化
* 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
* 这里需要注意三点:
* 1.排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
* 2."首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
* 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,i该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
* 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++) {
String message = "Hello World!" + i;
/**
* 发送消息
* 第一个参数:交换机
* 第二个参数:路由名
* 第三个参数:携带额外参数
* 第四个参数:消息的实体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
接收方1
package cn.aisencode.test02_work.rr.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 工作队列-轮询-消息消费者
*/
public class Recv01 {
//定义队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
/**
* 手动回执
* multiple是否确认多条
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
接收方2
package cn.aisencode.test02_work.rr.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 工作队列-轮询-消息消费者
*/
public class Recv02 {
//定义队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
/**
* 手动回执
* multiple是否确认多条
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
公平模式
发送方
package cn.aisencode.test02_work.fair.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 工作队列-公平-消息生产者
* 相比工作队列-轮询解决消息更快,能者多劳
*
*/
public class Send {
//定义队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel()) {
/**
* 声明队列
* 第一个参数queue:队列名称
* 第二个参数durable:是否持久化
* 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
* 这里需要注意三点:
* 1.排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
* 2."首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
* 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,i该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
* 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++) {
String message = "Hello World!" + i;
/**
* 发送消息
* 第一个参数:交换机
* 第二个参数:路由名
* 第三个参数:携带额外参数
* 第四个参数:消息的实体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
接收方1
package cn.aisencode.test02_work.fair.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 工作队列-公平-消息消费者
*/
public class Recv01 {
//定义队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//公平模式,限制消费者每次只能消费一条,处理完才能接受下一条
int prefetchCount = 1;
channel.basicQos(prefetchCount);
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
/**
* 手动回执
* multiple是否确认多条
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
接收方2
package cn.aisencode.test02_work.fair.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 工作队列-公平-消息消费者
*/
public class Recv01 {
//定义队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//公平模式,限制消费者每次只能消费一条,处理完才能接受下一条
int prefetchCount = 1;
channel.basicQos(prefetchCount);
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
/**
* 手动回执
* multiple是否确认多条
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
Publish/Subscribe:消息的发布与订阅模式队列
工作队列背后的假设是每个任务都被交付给一个工人。在这一部分,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式被称为“发布/订阅”。
发送方
package cn.aisencode.test03_fanout.send;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 订阅队列-消息生产者
* 每个消费者都获得相同的信息
* 生成的是排他队列,自动删除
*/
public class Send {
//定义交换机名称
private final static String Exchange_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel()) {
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World!";
/**
* 发送消息
* 第一个参数:交换机
* 第二个参数:路由名
* 第三个参数:携带额外参数
* 第四个参数:消息的实体
*/
channel.basicPublish(Exchange_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
接收方1
package cn.aisencode.test03_fanout.recv;
import com.rabbitmq.client.*;
/**
* 发布订阅-消息消费者
*/
public class Recv01 {
//定义队列名称
private final static String Exchange_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.FANOUT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,Exchange_NAME,"");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
接收方2
package cn.aisencode.test03_fanout.recv;
import com.rabbitmq.client.*;
/**
* 发布订阅-消息消费者
*/
public class Recv02 {
//定义队列名称
private final static String Exchange_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.FANOUT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,Exchange_NAME,"");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
Routing:路由模式队列
发送方
package cn.aisencode.test04_direct.send;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 路由队列-消息生产者
* 根据不同的消费者,分发信息
* 生成的是排他队列,自动删除
* 缺点:项目越来越大,路由key难以管理
*/
public class Send {
//定义交换机名称
private final static String Exchange_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel()) {
//绑定交换机,选择DIRECT
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.DIRECT);
String infoMessage = "普通消息!";
String warnMessage = "警告消息!";
String errorMessage = "错误消息!";
String infoRoutingKey = "info";
String warnRoutingKey = "warn";
String errorRoutingKey = "error";
/**
* 发送消息
* 第一个参数:交换机
* 第二个参数:路由名
* 第三个参数:携带额外参数
* 第四个参数:消息的实体
*/
channel.basicPublish(Exchange_NAME, infoRoutingKey, null, infoMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + infoMessage + "'");
channel.basicPublish(Exchange_NAME, warnRoutingKey, null, warnMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + warnMessage + "'");
channel.basicPublish(Exchange_NAME, errorRoutingKey, null, errorMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + errorMessage + "'");
}
}
}
接收方1
package cn.aisencode.test04_direct.recv;
import com.rabbitmq.client.*;
/**
* ;路由队列-消息消费者
*/
public class Recv01 {
//定义队列名称
private final static String Exchange_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
String errorRoutingKey = "error";
channel.queueBind(queueName,Exchange_NAME,errorRoutingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
接收方2
package cn.aisencode.test04_direct.recv;
import com.rabbitmq.client.*;
/**
* 路由队列-消息消费者
*/
public class Recv02 {
//定义队列名称
private final static String Exchange_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
String infoRoutingKey = "info";
String warnRoutingKey = "warn";
String errorRoutingKey = "error";
channel.queueBind(queueName,Exchange_NAME,infoRoutingKey);
channel.queueBind(queueName,Exchange_NAME,warnRoutingKey);
channel.queueBind(queueName,Exchange_NAME,errorRoutingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
Topic:主题模式队列
发送方
package cn.aisencode.test05_topic.send;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 主题队列-消息生产者
* 根据不同的消费者,分发信息
* 生成的是排他队列,自动删除
* 以通配符管理路由名
* 发送用完整路由名
* 接收用通配符
* * 1一个单词
* # 0个或多个单词
* 使用最多的模式
*/
public class Send {
//定义交换机名称
private final static String Exchange_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel()) {
//绑定交换机,选择DIRECT
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.TOPIC);
String infoMessage = "普通消息!";
String warnMessage = "警告消息!";
String errorMessage = "错误消息!";
String infoRoutingKey = "info.message.orange";
String warnRoutingKey = "orange.warn.message";
String errorRoutingKey = "error.rabbit.message";
/**
* 发送消息
* 第一个参数:交换机
* 第二个参数:路由名
* 第三个参数:携带额外参数
* 第四个参数:消息的实体
*/
channel.basicPublish(Exchange_NAME, infoRoutingKey, null, infoMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + infoMessage + "'");
channel.basicPublish(Exchange_NAME, warnRoutingKey, null, warnMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + warnMessage + "'");
channel.basicPublish(Exchange_NAME, errorRoutingKey, null, errorMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + errorMessage + "'");
}
}
}
接收方1
package cn.aisencode.test05_topic.recv;
import com.rabbitmq.client.*;
/**
* ;主题队列-消息消费者
*/
public class Recv01 {
//定义队列名称
private final static String Exchange_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
String errorRoutingKey = "#.message.#";
channel.queueBind(queueName,Exchange_NAME,errorRoutingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
接收方2
package cn.aisencode.test05_topic.recv;
import com.rabbitmq.client.*;
/**
* 主题队列-消息消费者
*/
public class Recv02 {
//定义队列名称
private final static String Exchange_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建通信
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(Exchange_NAME, BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
String routingKey = "*.rabbit.*";
channel.queueBind(queueName,Exchange_NAME,routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* 回调
* 消费者标签
* 消息
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//模拟消费超时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//监听队列,消费消息autoAck 自动回执(收到消息自动回复收到了)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
RPC:远程过程调用模式队列
使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有任何值得分配的耗时任务,我们将创建一个返回斐波那契数列的虚拟 RPC 服务。
服务器端
package cn.aisencode.test06_rpc.server;
import com.rabbitmq.client.*;
/**
* 远程过程调用(RPC)
* 前面我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。
* 如果我们需要在远程计算机上运行一个函数并等待结果呢?嗯,这是一个不同的故事。这种模式通常称为远程过程调用或RPC。
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
/**
* 计算斐波那契数列
* @param n
* @return
*/
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
try (Connection connection = factory.newConnection();
//通过连接创建通道
Channel channel = connection.createChannel()) {
//声明队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
//限制RabbitNQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
//获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//获取replyTo队列和correlationId请求标识
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
//接收客户端消息
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
//服务端根据业务需求处理
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
//将处理结果发送至replyTo队列同时携带correlationId属性
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
//手动回执
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
//RabbitMq消费者工作线程通知RPC服务器其他所有线程运行
synchronized (monitor) {
monitor.notify();
}
}
};
/**
* 监听队列
* autoAck = true代表自动确认消息
* autoAck = false代表手动确认消息
*/
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
//线程等待并准备接收来自RPC客户端的消息
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
客户端
package cn.aisencode.test06_rpc.client;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
//请求服务端
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
//请求服务端
public String call(String message) throws IOException, InterruptedException {
//correlationId 请求标识id
final String corrId = UUID.randomUUID().toString();
//获取队列名称
String replyQueueName = channel.queueDeclare().getQueue();
//设置replyTo队列 和请求标识
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//发送消息至队列
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//设置公平模式,线程等待,每次只接收一个响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//接收服务端返回结果
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
// 将给定的元素在给定的时间设置到线程队列中,如果设置成功返回true,否则false
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
//从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值
String result = response.take();
//从消息队列中丢弃该值
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
//关闭连接
connection.close();
}
}
RabbitMQ:事务机制
在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问
题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布
操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之
前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决
呢?
RabbitMQ为我们提供了两种方式:
- 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
- 通过将channel设置成confirm模式来实现;
AMQP事物机制控制
RabbitMQ中与事务机制有关的方法有三个: txSelect( ) , txCommit()以及 txRollback(), txSelect()用于将当前channel设置成transaction
模式, txCommit( )用于提交事务,txRollback( )用于回滚事务,在通过txSelect()开启事务之后,我们便可以发布消息给broker代理服务器
了,如果txCommit()提交成功了,则消息一定到达了broker了,如果在txCommit()执行之前broker异常崩溃或者由于其他原因抛出异
常,这个时候我们便可以捕获异常通过txRollback ()回滚事务。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 事务-消息生产者
*/
public class Send {
//定义队列名称
private final static String QUEUE_NAME = "tx";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
//连接工厂创建连接
connection = factory.newConnection();
//创建通信
channel = connection.createChannel();
//开启事务
channel.txSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//手动异常
int i = 10 / 0;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
//提交事务
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
System.out.println(e.getMessage());
//回滚事务
channel.txRollback();
}
}
}
事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获
异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知
道消息已经正确送到.又能基本上不带来性能上的损夫呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方
案,即将channel信道设置成confirm模式。
Confirm:确认模式
通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐,此时处理AMQP协
议层面能够实现消息事物控制外,我们还有第二种方式即Confirm模式。
原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一
旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者〈包含消息的唯一lD) ,这就使得生产者知道消息已经正确到达目
的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包
含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消更都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,
当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发
送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
在channel被设置成 confirm模式之后,所有被publish的后续消息都将被confirm (即 ack)或者被nack一次。但是没有对消息被confirm
的快慢做任何保证,并且同一条消息不会既被confirm又被nack 。
注意两种事物控制形式不能同时开启
Confirm确认机制代码实现
实现生产者confirm机制有三种方式:
普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm。
批量confirm模式:每发送一批消息后,调用waitForConfirmsorDie()方法,等待服务器端confirm。
异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
同步Confirm
package cn.aisencode.test08_confirm.sync.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 确认模式-同步-消息生产者
*/
public class Send {
//定义队列名称
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置host
factory.setHost("47.94.211.83");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yebAdmin");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
//连接工厂创建连接
connection = factory.newConnection();
//创建通信
channel = connection.createChannel();
//开启确认模式
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
//普通confirm模式
/*if (channel.waitForConfirms()) {
System.out.println("消息发送成功");
}*/
//批量confirm模式,只要一条未确定就抛异常
channel.waitForConfirmsOrDie();
System.out.println("消息发送成功");
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
以上代码可以看出,使用同步的方式需要等所有的消息发送成功以后才会执行后面代码,只要有一个消息未被确认就会抛出IO异常。解
决办法可以使用异步确认。
异步Confirm
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),
我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck 方法,
unconfirm集合删掉相应的一条(multiple=false〉或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序
集合SortedSet存储结构。实际上, waitForConfirms()方法也是通过SortedSet维护消息序号的。
Spring集成RabbitMQ
父工程
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<version>2.5.0</version>
<artifactId>spring-boot-starter-parent</artifactId>
<relativePath/>
</parent>
<modules>
<module>amqp-send</module>
<module>qmqp-recv</module>
</modules>
<groupId>cn.aisencode</groupId>
<artifactId>springamqpdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
</dependencies>
</project>
子工程-发送方
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.aisencode</groupId>
<artifactId>amqp-send</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>cn.aisencode</groupId>
<artifactId>springamqpdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
配置文件
application.yml
spring:
rabbitmq:
host: 47.94.211.83
port: 5672
username: yeb
password: yebAdmin
virtual-host: /yeb
server:
port: 8081
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Aisen
* @time 2021.6.10 10:20
*/
@Configuration
public class RabbitMQConfig {
//声明队列
@Bean
public Queue queue() {
return new Queue("amqp_test");
}
//声明交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("amqp_exchange");
}
//绑定交换机和队列
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("*.amqp.#");
}
}
启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Aisen
* @time 2021.6.10 10:07
*/
@SpringBootApplication
public class SendApplication {
public static void main(String[] args) {
SpringApplication.run(SendApplication.class,args);
}
}
测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Aisen
* @time 2021.6.10 10:28
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class SendTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
String message = "Hello world!";
System.out.println("发送消息:" + message);
//发送消息,最简单的方式
//rabbitTemplate.convertAndSend(message);
//指定交换机,路由
rabbitTemplate.convertAndSend("amqp_exchange","test.amqp",message);
}
}
子工程-接收方
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.aisencode</groupId>
<artifactId>qmqp-recv</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>cn.aisencode</groupId>
<artifactId>springamqpdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
配置文件
application.yml
spring:
rabbitmq:
host: 47.94.211.83
port: 5672
username: yeb
password: yebAdmin
virtual-host: /yeb
server:
port: 8082
启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Aisen
* @time 2021.6.10 10:07
*/
@SpringBootApplication
public class RecvApplication {
public static void main(String[] args) {
SpringApplication.run(RecvApplication.class,args);
}
}
测试类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author Aisen
* @time 2021.6.10 10:37
*/
@Component
@RabbitListener(queues = "amqp_test")
public class RecvTest {
//监听获取到的消息后的处理方法
@RabbitHandler
public void testRecv(String message) {
System.out.println("接收到消息:" + message);
}
}
测试
启动发送方测试类
启动接收方启动类