rabbitmq
前言
各种mq产品不统一,导致无法联合组成单一的总线,java程序只需要对JMS API编程,使用合适的MQ驱动,隐藏其中实际接口,但是单独化标准型接口胶合众多不同接口,会使JMS程序很脆弱。
概念
AMQP协议
- Module Layer:定义供客户端调用命令,如Basic.Consume,Queue.declare
- Session Layer:负责客户端和服务器的通信,可靠性同步机制和错误处理
- Transport Layer:传输二进制数据流,提供帧处理,信道复用,错误检测,数据表示
AMQP命令
信道channel
建立一条TCP链接,创建N多条信道(一个线程一个channel实例,多线程共享channel非安全),发送AMQP命令,复用连接,减少tcp连接创建开销
元数据
- 队列元数据–队列名称和他们的属性(是否持久化)
- 交换器元数据–交换器名称,类型和属性
- 绑定元数据–一张简单的表格展示如何将消息路由到队列
- Vhost元数据–为vhost内的队列,交换器和绑定提供命名空间和安全属性
交换器
交换器不过是一张查询表,创建新的交换器不过是将查询表添加到集群所有节点上,集群中每个节点拥有每个交换器所有信息,节点故障时,只需生产者 重新连接到集群上,而发出过程未到达队列中时的消息通过confirm模式确保消息不丢失
类型
- direct:消息路由到routingkey和bindingkey完全匹配的队列中
- faout: 消息投递给所有与交换器绑定的队列中
- topic: 通过模糊匹配(“.”,“*”,“#”),根据Exchange,找到对应routingkey,routingkey通过bind的队列(queue)将消息入队
- header:已淘汰
- vhost:虚拟消息服务器(mini版rabbit服务器,拥有独立的队列,交换器,绑定),权限控制单位
是否持久化
持久化将交换器存盘,服务器重启不会丢失exchange信息
是否自动删除
当至少一个队列或交换器与该交换器绑定,之后当所有与该exchange绑定的队列/交换器都解绑,自动删除交换器
是否内置
内置交换器,客户端无法将消息直接发送到这个交换器中,只能通过交换器路由到交换器
队列
集群中不是每一个节点都有所有队列完全拷贝,其中一个节点挂掉,该节点关联的队列和绑定都会消失,其中没有持久化的队列会被消费者重新声明创建,持久化的队列只有等待重启
是否持久化
持久化将交换器存盘,服务器重启不会丢失exchange信息
是否排它
该队列仅对第一次声明它的连接可见,且连接断开自动删除。其他连接不允许建立同名排它队列。一个客户端同时发送,读取消息应用场景
是否自动删除
当至少有一个消费者曾连接到该队列,之后所有与该queue连接的的消费者都断开时,队列自动删除。
生产中需要对队列的流量,内存,网卡占用有清晰认知,预估平均值和峰值,以便合理分配硬件资源
rabbitmq不将队列内容和状态复制到所有节点原因
- 存储空间
- 性能 消息发布需要将消息复制到每一个节点,持久化消息会触发磁盘活动
消息生产
channel.basicPublish:
- mandatory:,当exchange无法根据自身和路由键找到合适队列时,为true,消息会返回生产者;为false,直接丢弃。生产者通过addReturnListener添加监听器监听返回的消息
- immediate:交换器发送消息到队列,队列不存在任何消费者时,为true:消息不会入队且返回消息给生产者; (3.0已取消,官方回答影响队列性能,增加代码复杂性)
备份交换器
未设置mandatory,会导致消息丢失,设置了,客户端需添加 addReturnListener。通过声明channel.exchangeDeclare方法alternate-exchange参数 使用备份交换器将未被路由消息存储RabbitMQ中。
Map<String,Object> args = new HashMap();
args.put("alternate-exchange","myAe");
channel.exchangeDeclare("nomalExchange","direct",true,false,args);
channel.exchangeDeclare("myAe","fanout",true,false,null);
过期消息TTL
- 通过Policy方式
- 通过HTTP API接口
- 通过队列属性设置
Map<String,Object> args = new HashMap(); args.put("x-message-ttl",6000);// 设置为0立即丢掉 channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
- 通过消息本身 ```java // 该方式一旦消息过期,会从队列丢弃 AMOP.BasicProperties.Builder builder = new AMOP.BasicPropertiesBuilder.Builder(); builder.expiration(“60000”);// 设置TTL=6000ms channel.basicPublish(builder.build())
// 该方式一旦消息过期,不会立即丢弃,等消费时判断过期再丢弃 随机时间消息遍历带来开销 AMOP.BasicProperties properties = new AMOP.BasicPropertiesBuilder(); builder.expiration(“60000”);// 设置TTL=6000ms channel.basicPublish(properties)
### **队列过期TTL**
```java
Map<String,Object> args = new HashMap();
args.put("x-expires",6000); // 队列删除前处于未使用状态时间(没任何消费者,未被重新声明,未调用Basic.Get) 设置为0立即丢掉
channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
DLX死信队列
生产者发送消息到队列A,队列A没有消费者消费,过了过期时间,消息被重新转发到死信队列进行处理
- 消息被拒绝,Reject或Nack,且requeue为false
- 消息过期
- 队列最大
channel.exchangeDeclare("dlx_exchange","direct");
Map<String,Object> args = new HashMap();
args.put("x-dead-letter-exchange","dlx_exchange");
args.put("x-dead-letter-routing-key","dlx-routing-key");
channel.queueDeclare("myqueue",false,false,false,args);
优先级队列
设置队列最大优先级
Map<String,Object> args = new HashMap();
args.put("x-max-priority", 10);
channel.queueDeclare("myqueue",true,false,false,args);
设置消息优先级
// 该方式一旦消息过期,会从队列丢弃
AMOP.BasicProperties.Builder builder = new AMOP.BasicPropertiesBuilder.Builder();
builder.priority(5);
channel.basicPublish("exchange_priority","rk_priority",builder.build(),("messages").getBytes());
消费者消费速度大于生产者速度,Broker没有堆积情况下,优先级失效
消息消费
RabbitMQ消费模式为两种:
- 推模式:Basic.Consume
-
拉模式:Basic.Get。需手动确认ACK
- 自动确认:设置是否自动确认
- Qos:推送消息个数限制
- 消费者标签:用来区分多个消费者
- noLocal:同一个连接生产者不可传动到该connection消费者
- 是否排它
- 消费回调函数
- handleShutdownSignal:channel或connection关闭时调用函数
- handleCancelOk\channel.basicCancel:消费端显式或隐式取消订阅
- handleShutdownSignal:channel或connection关闭时调用函数
消息确认机制
- autoAck:MQ自动把发出去消息置为确认,然后从内存删除,不管消费者是否真正消费到消息
- manulAck:MQ等待消费者显式回复确认信号才从内存(磁盘)移去消息(打标,删除)。未手动确认,断开connection,消息重新入队
消息拒绝
- Basic.Reject:拒绝该消息,重新入队或从队列中删除该消息
消息分发
多消费者时,轮询方式分发消息到消费者。channel.basicQos运行信道消费者保持最大未确认消息数量。均衡了消费者任务(对pull模式无效)
一个信道可以消费多个队列,设置prefetchCount 大于 0 ,信道需要和各个队列协调确保发送的消息都没有超过所限定的prefetchCount的值,使 RabbitMQ 的性能降低,尤其是队列分散集群多个 Broker 节点。
消息投递流程
生产者把消息发布到交换器(Exchange)上,绑定决定如何从路由器到达队列,队列中消息被消费者消费。
生产者生产消息
- 生产者连接到broker,建立connection,开启channel
- 生产者声明exchange,设置类型,持久化等属性
- 生产者声明queue,设置排它,持久化,自动删除属性
- 生产者通过routingkey将exchange和queue进行绑定
- 生产者消息发送
- 交换器根据路由键查找匹配队列
- 找到,存入队列;没找到,选择丢弃或返回生产者
- 关闭channel
- 关闭connection
消费者消费消息
- 消费者连接到broker,建立connection,开启channel
- 消费者请求消费相应队列中消息,设置回调函数
- 等到broker回应,投递消息,消费者接受消息
- 消费者ACK消息
- mq删除队列中对应消息
- 关闭channel
- 关闭connection
RPC
TODO
特点
- 多个消费者消费一条队列,一条消息只会被一个消费者消费。
- 消费者收到一条消息后确认之前断开连接(或者取消订阅),rabbitmq会把这条消息分发给其他消费者
- 消费者收到一条消息后忘记确认,rabbitmq不会向该消费者分发更多消息(延迟确认)
- 当消费者拒绝接受消息时,设置reject的参数queue为true,rabbitmq会把这条消息分发给其他消费者;如果设置为false,rabbitmq则会认为该消息格式错误,会马上从队列中删除。
- 消费者,生成者都能使用amqp的queue.declare创建队列(passive为true,队列若存在,会成功返回,false会返回一个错误)
- exclusive设置为true,队列将为私有,为一个消费者消费,
- auto-delete 当最后一个消费者取消订阅时,队列会自动移除
- 生产者发出的消息如果路由到了不存在的队列,rabbitmq会忽略消息
消息持久化
- 交换器持久化:声明队列时,durable设置为true
- 队列持久化:声明队列时,durable设置为true
- 消息持久化:BasicProperties deliveryMode设置为2实现消息队列化
消息和队列应该同时设置持久化,否则毫无意义。应分场景设置持久化,毕竟磁盘IO操作开销远大于内存操作
RabbitMQ并不会每条消息都同步存盘,系统flush操作,从而导致极端情况消息丢失,通过1镜像队列来提高可靠性2引入事务机制或发送方确认机制保证持久化
事务机制(性能损失)
channel.txSelect();信道设置为事务模式
channel.txCommit();用于提交事务
channel.txRollback();用于事务回滚
发送方确认机制(异步,轻量级)消息,队列持久化,写入磁盘后进行确认
channel.confimSelect();设置信道为confim模式,消息指定唯一ID
channel.basicPublish("exchange","routingKey",null,("messages").getBytes());
if(!channel.waitForConfirms()) { // 必须和confimSelect配套出现
// send message fail
}
事务机制和发送方确认机制两者互斥 事务机制和发送方确认机制配合mandatgory或备份交换器保证传输可靠性
批量confirm
能够提高性能。通过定期,定量发送多少条消息后调用waitForConfirm等待ACK确认,但会导致一条返回nack或超时,所有消息需重发。减缓性能
try {
channel.confirmSelect();
int MsgCount = 0;
while (true) {
channel.basicPublish("exchange", "routingKey",
null, "batch confirm test".getBytes());
//将发送出去的消息存入缓存中,缓存可以是
// 一个 ArrayList 或者 BlockingQueue 之类的
if (++MsgCount >= BATCH_COUNT) {
MsgCount = 0;
try {
if (channel.waitForConfirms()) {
//将缓存中的消息清空
}
//将缓存中的消息重新发送
} catch (InterruptedException e) {
e.printStackTrace();
//将缓存中的消息重新发送
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
异步confirm
通过channel.addConfirmListener添加监听器,重写HandleAck和HandleNack方法
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag
+ ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple)
throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
//注意这里需要添加处理消息重发的场景
}
});
//下面是演示一直发送消息的场景
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
消息顺序性
网络中断,事务失败,死信,异步都会导致,通过全局id协调
消息可靠性
消息中间件传输保障三等级
- at most once:最多一次
- at least once:最少一次
- exactly once:恰好一次
RabbitMQ支持“最多一次”和“最少一次”。后者需事务/confirm机制,配置mandatory或备份交换器,加上持久化处理,加上消费者手动 ACK
RabbitMQ 实现exactly once:在最少一次基础上在消费端实现**业务的幂等性
安装与运行
通过docker安装,部署
// 下载镜像
docker pull rabbitmq:3.7.16-management
// 启动容器
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672 <container id>
多租户,权限
erlang虚拟机简称节点,多个erlang程序可以运行在同一个节点,节点之间可以通信。 vhost本质一个独立rabbitmq服务器,拥有自己队列,交换器,绑定,权限。系统默认 “/” vhost
// 创建vhost
rabbitmqctl add_vhost vhost1
// 查看vhost
rabbitmqctl list_vhosts vhost1
// 删除vhost
rabbitmqctl delete_vhost vhost1
// vhost 授权用户名 可配置资源 可写资源 可读资源
rabbitmqctl set_permissions -p /vhost1 user_admin '.*' '.*' '.*'
// 清除权限
rabbitmqctl clear_permissions -p /vhost1 user_admin
// 列举权限
rabbitmqctl list_permissions vhost1
// 列举用户权限
rabbitmqctl list_user_permissions user_admin
// 创建用户
rabbitmqctl add_user root root123
// 修改用户密码
rabbitmqctl change_password root root
// 清除用户密码
rabbitmqctl clear_password root
// 删除用户
rabbitmqctl delete_user root
// 列举用户
rabbitmqctl list_users
// 设置用户角色
rabbitmqctl set_user_tags root admin
- 启动rabbitmq以及整个erlang节点
- 安装目录./sbin运行./rabbitmq-server,
- /var/log/rabbitmq/下查看日志,
- ./sbin/rabbitmqctl stop 停止 rabbitmq
- rabbitmq中队列,交换器和绑定原数据都是保存到minesia(内建erlang非sql型数据库)
- dump_log_write_threshold,日志文件转储真实数据库文件额度
检查
rabbit编码和模式
解耦:
- 异步状态,分离请求和动作
- 提供扩展性
- 零成本api(消息通信)
发后即忘模型:
- 批处理
- 通知
- 发送警告,并行处理
rabbitmq实现RPC等待响应,私有队列和发送确认 AMQP有reply_to,生产者通过该字段可确认队列,并监听队列等待应答
rabbitmq集群:运行消费者和生产者rabbit节点奔溃情况下继续运行,通过添加更多节点来线性扩展消息通信吞吐量。
Rabbitmq manage
存放 $RABBITMQ_HOME/plugins
目录下
指标
- Publish:producer发布到Exchange成功的消息数
- Publisher confirm:producer发布到Exchange成功且confirm数
- Deliver (manual ack):消息进入queue成功数(手动ack)
- Deliver (auto ack):消息进入queue成功数(自动ack)
- Consumer ack:consumer消费消息成功数
- Redelivered:consumer消费消息,返回Nack或reject,重新进入queue数
- Get (manual ack):手动获取队列消息
- Get (auto ack) :
- Disk read:消息磁盘读取
- Disk write:消息磁盘持久化写入
web管理rabbitmq
rabbitmq-plugins enable rabbitmq_management –启用插件
默认用户guest 只允许localhost登录。
(1)超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
(2) 监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
与administrator的对比,administrator能看到这些内容
(4) 普通管理者(management)
应用管理
// 停止运行 RabbitMQ 的 Erlang 虚拟机和 RabbitMQ 服务应用 pid_file存放于 Mnesia 目录中
rabbitmqctl stop [pid_file]
rabbitmqctl shutdown
// 停止 RabbitMQ 服务应用
rabbitmqctl stop_app
// 启动 RabbitMQ 应用
rabbitmqctl start_app
// 等待 RabbitMQ 应用的启动
rabbitmqctl wait [pid_file]
// RabbitMQ 节点重置还原到最初状态
rabbitmqctl reset
// 强制将 RabbitMQ 节点重置还原到最初状态
rabbitmqctl force_reset
// RabbitMQ 节点轮换日志文件
rabbitmqctl rotate_logs {suffix}
// 将节点加入指定集群中
rabbitmqctl join_cluster {cluster_node} [--ram]
// 显示集群的状态
rabbitmqctl cluster_status
// 修改集群节点的类型
rabbitmqctl change_cluster_node_type {disc|ram}
// 将节点从集群中删除
rabbitmqctl forget_cluster_node [--offline]
// 在集群中的节点应用启动前咨询 clusternode 节点的最新信息,并更新相应的集群信息
rabbitmqctl update_cluster_nodes {clusternode}
// 确保节点可以启动,即使它不是最后一个关闭的节点
rabbitmqctl force_boot
// 指示未同步队列 queue 的 slave 镜像可以同步 master 镜像行的内容
rabbitmqctl sync_queue [-p vhost] {queue}
// 取消队列 queue 同步镜像的操作
rabbitmqctl cancel_sync_queue [-p vhost] {queue}
// 设置集群名称
rabbitmqctl set_cluster_name {name}
// 队列状态 queueinfoitem可以是很多参数,
rabbitmqctl list_queues [-p vhost] [queueinfoitem ...]
// 交换器状态
rabbitmqctl list_exchanges [-p vhost] [exchangeinfoitem ...]
// 连接状态
rabbitmqctl list_bindings [-p vhost] [exchangeinfoitem ...]
// 信道状态
rabbitmqctl list_channels [-p vhost] [exchangeinfoitem ...]
// 消费者状态
rabbitmqctl list_consumers [-p vhost] [exchangeinfoitem ...]
// broker状态
rabbitmqctl status
// 对 RabbitMQ节点进行健康检查
rabbitmqctl node_health_check
// 显示每个运行程序环境中每个变量的名称和值 rabbitmqctl environment
// 为所有服务器状态生成一个服务器状态报告,并将输出重定向到一个文件
rabbitmqctl report
// 执行任意 Erlang表达式
rabbitmqctl eval {expr}
rabbitMQ 配置
TODO
rabbitMQ 运维
单台RabbitMQ可满足1000/s的吞吐量。集群模式下,某个节点奔溃,那么这节点上消息会丢失
集群所有节点备份所有元数据信息(名称和属性)。基于存储空间和性能考虑,只会在宿主节点创建队列进程和完整队列信息(元数据,状态,内容),其他节点只知道队列元数据和指向该队列所在节点指针。
多机多节点
// 配置hosts文件
192.168.0.2 node1
192.168.0.3 node2
192.168.0.4 node3
// 编辑 RabbitMQ的cookie文件,以确保各个节点的 cookie 文件使用的是同一个值
cookie(/var/lib/rabbitmq/.erlang.cookie)
// 启动 node1、node2 和 node3 这 3 个节点的 RabbitMQ 服务
rabbitmq-server –detached
// 将node2 和 node3 节点加入node1节点的集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
如果关闭集群中所有节点,在启动的时最后关闭的那个节点第一个启动, 如果不是,这个节点会等待最后关闭的节点启动 直到超时失败
可通过 rabbitmqctl forget_cluster_node 将最后关闭节点剔出集群
升级集群节点
- 指令备份当前配置,
- 关闭所有生产者,等待消费者完成所有消息,
- 关闭节点,解压最小rabbitmq在磁盘节点,
- 启动磁盘节点,启动其他内存节点
镜像队列处理路由消息到合适的队列,也要将消息投递到镜像队列中拷贝
warren和sovel:故障转移和复制 warren是指一对主/备独立服务器
日志
RABBITMQ_NODENAME.log 文件查看日志 rabbitmq.config配置文件log_levels参数配置日志级别
单节点恢复
机器故障,机器掉电等导致单节点宕机。
// 命令来将故障节点剔除
rabbitmqctl forget_cluster_node {nodename}
// 要将故障节点的 IP 地址从连接列表里删除
// 删除当前故障机器的 RabbitMQ 中的 Mnesia
数据
// 恢复后作为新节点加入集群
集群迁移
TODO
磁盘节点,内存节点
单节点系统只允许磁盘类型的节点;
集群中要求至少有一个磁盘节点,当磁盘节点挂了时,集群可以继续路由消息,但是不能:创建队列 创建交换器 创建绑定 添加用户 更改权限 添加或删除节点
添加内存节点时确保告知其所有磁盘节点,以便内存节点能在重启时重新加入集群(如果内存节点重启时,磁盘节点宕机,就比较麻烦了)
使用rest api控制rabbitmq
// 查看host虚拟机队列queue的数据统计
http:localhost:15672/api/queues/host/queue
// 查看当前链接列表和详情
/api/connections
// 下载或者上传完整的rabbitmq服务器配置,队列,交换器和绑定
/api/all-configuration
// 列出集群中所有节点
/api/nodes
// 创建或者查看rabbitmq用户
/api/users/<user>
// 查看或者创建虚拟主机
/api/vhosts/<vhost>
// 为用户设置权限
/api/permissions/<vhost>/<user>
TODO
Rabbitmq监控
TODO