rabbitmq

前言

各种mq产品不统一,导致无法联合组成单一的总线,java程序只需要对JMS API编程,使用合适的MQ驱动,隐藏其中实际接口,但是单独化标准型接口胶合众多不同接口,会使JMS程序很脆弱。

概念

AMQP协议

AMQP命令

此处输入图片的描述 此处输入图片的描述 此处输入图片的描述

信道channel

建立一条TCP链接,创建N多条信道(一个线程一个channel实例,多线程共享channel非安全),发送AMQP命令,复用连接,减少tcp连接创建开销

元数据

交换器

交换器不过是一张查询表,创建新的交换器不过是将查询表添加到集群所有节点上,集群中每个节点拥有每个交换器所有信息,节点故障时,只需生产者 重新连接到集群上,而发出过程未到达队列中时的消息通过confirm模式确保消息不丢失

类型

是否持久化

持久化将交换器存盘,服务器重启不会丢失exchange信息

是否自动删除

至少一个队列或交换器与该交换器绑定,之后当所有与该exchange绑定的队列/交换器都解绑,自动删除交换器

是否内置

内置交换器,客户端无法将消息直接发送到这个交换器中,只能通过交换器路由到交换器

队列

集群中不是每一个节点都有所有队列完全拷贝,其中一个节点挂掉,该节点关联的队列和绑定都会消失,其中没有持久化的队列会被消费者重新声明创建,持久化的队列只有等待重启

是否持久化

持久化将交换器存盘,服务器重启不会丢失exchange信息

是否排它

该队列仅对第一次声明它的连接可见,且连接断开自动删除。其他连接不允许建立同名排它队列。一个客户端同时发送,读取消息应用场景

是否自动删除

当至少有一个消费者曾连接到该队列,之后所有与该queue连接的的消费者都断开时,队列自动删除。

生产中需要对队列的流量,内存,网卡占用有清晰认知,预估平均值和峰值,以便合理分配硬件资源

rabbitmq不将队列内容和状态复制到所有节点原因

  1. 存储空间
  2. 性能 消息发布需要将消息复制到每一个节点,持久化消息会触发磁盘活动

消息生产

channel.basicPublish

备份交换器

未设置mandatory,会导致消息丢失,设置了,客户端需添加 addReturnListener。通过声明channel.exchangeDeclare方法alternate-exchange参数 使用备份交换器将未被路由消息存储RabbitMQ中。

Map<String,Object> args = new HashMap();
args.put("alternate-exchange","myAe");
channel.exchangeDeclare("nomalExchange","direct",true,false,args);
channel.exchangeDeclare("myAe","fanout",true,false,null);

过期消息TTL

// 该方式一旦消息过期,不会立即丢弃,等消费时判断过期再丢弃 随机时间消息遍历带来开销 AMOP.BasicProperties properties = new AMOP.BasicPropertiesBuilder(); builder.expiration(“60000”);// 设置TTL=6000ms channel.basicPublish(properties)


### **队列过期TTL**
```java
Map<String,Object> args = new HashMap();
args.put("x-expires",6000); // 队列删除前处于未使用状态时间(没任何消费者,未被重新声明,未调用Basic.Get) 设置为0立即丢掉
channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);

DLX死信队列

生产者发送消息到队列A,队列A没有消费者消费,过了过期时间,消息被重新转发到死信队列进行处理

channel.exchangeDeclare("dlx_exchange","direct");
Map<String,Object> args = new HashMap();
args.put("x-dead-letter-exchange","dlx_exchange");
args.put("x-dead-letter-routing-key","dlx-routing-key");
channel.queueDeclare("myqueue",false,false,false,args);

优先级队列

设置队列最大优先级

Map<String,Object> args = new HashMap();
args.put("x-max-priority", 10); 
channel.queueDeclare("myqueue",true,false,false,args);

设置消息优先级

// 该方式一旦消息过期,会从队列丢弃
AMOP.BasicProperties.Builder builder = new AMOP.BasicPropertiesBuilder.Builder();
builder.priority(5);
channel.basicPublish("exchange_priority","rk_priority",builder.build(),("messages").getBytes());

消费者消费速度大于生产者速度,Broker没有堆积情况下,优先级失效

消息消费

RabbitMQ消费模式为两种:

消息确认机制

消息拒绝

消息分发

多消费者时,轮询方式分发消息到消费者。channel.basicQos运行信道消费者保持最大未确认消息数量。均衡了消费者任务(对pull模式无效)

一个信道可以消费多个队列,设置prefetchCount 大于 0 ,信道需要和各个队列协调确保发送的消息都没有超过所限定的prefetchCount的值,使 RabbitMQ 的性能降低,尤其是队列分散集群多个 Broker 节点。

消息投递流程

生产者把消息发布到交换器(Exchange)上,绑定决定如何从路由器到达队列,队列中消息被消费者消费。

生产者生产消息

消费者消费消息

此处输入图片的描述

RPC

TODO

特点

消息持久化

消息和队列应该同时设置持久化,否则毫无意义。应分场景设置持久化,毕竟磁盘IO操作开销远大于内存操作

RabbitMQ并不会每条消息都同步存盘,系统flush操作,从而导致极端情况消息丢失,通过1镜像队列来提高可靠性2引入事务机制或发送方确认机制保证持久化

事务机制(性能损失)

channel.txSelect();信道设置为事务模式
channel.txCommit();用于提交事务
channel.txRollback();用于事务回滚

发送方确认机制(异步,轻量级)消息,队列持久化,写入磁盘后进行确认

channel.confimSelect();设置信道为confim模式消息指定唯一ID

channel.basicPublish("exchange","routingKey",null,("messages").getBytes());
if(!channel.waitForConfirms()) { // 必须和confimSelect配套出现
    // send message fail
}

事务机制和发送方确认机制两者互斥 事务机制和发送方确认机制配合mandatgory或备份交换器保证传输可靠性

批量confirm

能够提高性能。通过定期,定量发送多少条消息后调用waitForConfirm等待ACK确认,但会导致一条返回nack或超时,所有消息需重发。减缓性能

try { 
 channel.confirmSelect(); 
 int MsgCount = 0; 
 while (true) { 
 channel.basicPublish("exchange", "routingKey", 
 null, "batch confirm test".getBytes()); 
 //将发送出去的消息存入缓存中,缓存可以是
 // 一个 ArrayList 或者 BlockingQueue 之类的
 if (++MsgCount >= BATCH_COUNT) { 
 MsgCount = 0; 
 try { 
 if (channel.waitForConfirms()) { 
 //将缓存中的消息清空
 } 
 //将缓存中的消息重新发送
 } catch (InterruptedException e) { 
 e.printStackTrace(); 
 //将缓存中的消息重新发送
 } 
 } 
 } 
} catch (IOException e) { 
 e.printStackTrace(); 
}

异步confirm

通过channel.addConfirmListener添加监听器,重写HandleAck和HandleNack方法

channel.confirmSelect(); 
channel.addConfirmListener(new ConfirmListener() { 
 public void handleAck(long deliveryTag, boolean multiple) 
 throws IOException { 
 System.out.println("Nack, SeqNo: " + deliveryTag 
 + ", multiple: " + multiple); 
 if (multiple) { 
 confirmSet.headSet(deliveryTag - 1).clear(); 
 } else { 
 confirmSet.remove(deliveryTag); 
 } 
 } 
 public void handleNack(long deliveryTag, boolean multiple) 
 throws IOException { 
 if (multiple) { 
 confirmSet.headSet(deliveryTag - 1).clear(); 
 } else { 
 confirmSet.remove(deliveryTag); 
 } 
 //注意这里需要添加处理消息重发的场景
 } 
}); 
//下面是演示一直发送消息的场景
while (true) { 
 long nextSeqNo = channel.getNextPublishSeqNo(); 
 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, 
 MessageProperties.PERSISTENT_TEXT_PLAIN, 
 ConfirmConfig.msg_10B.getBytes()); 
 confirmSet.add(nextSeqNo); 
}

消息顺序性

网络中断,事务失败,死信,异步都会导致,通过全局id协调

消息可靠性

消息中间件传输保障三等级

RabbitMQ支持“最多一次”和“最少一次”。后者需事务/confirm机制,配置mandatory或备份交换器,加上持久化处理,加上消费者手动 ACK

RabbitMQ 实现exactly once:在最少一次基础上在消费端实现**业务的幂等性

安装与运行

通过docker安装,部署

// 下载镜像
docker pull rabbitmq:3.7.16-management

// 启动容器
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672  <container id>

多租户,权限

erlang虚拟机简称节点,多个erlang程序可以运行在同一个节点,节点之间可以通信。 vhost本质一个独立rabbitmq服务器,拥有自己队列,交换器,绑定,权限。系统默认 “/” vhost

// 创建vhost
rabbitmqctl add_vhost vhost1
// 查看vhost
rabbitmqctl list_vhosts vhost1
// 删除vhost
rabbitmqctl delete_vhost vhost1

// vhost 授权用户名 可配置资源 可写资源 可读资源
rabbitmqctl set_permissions -p /vhost1 user_admin '.*' '.*' '.*'
// 清除权限
rabbitmqctl clear_permissions -p /vhost1 user_admin

// 列举权限
rabbitmqctl list_permissions vhost1

// 列举用户权限
rabbitmqctl list_user_permissions user_admin

// 创建用户
rabbitmqctl add_user root root123

// 修改用户密码
 rabbitmqctl change_password root root
 
// 清除用户密码
rabbitmqctl clear_password root

// 删除用户
rabbitmqctl delete_user root

// 列举用户
rabbitmqctl list_users

// 设置用户角色
rabbitmqctl set_user_tags root admin

检查

rabbit编码和模式

解耦:

  1. 异步状态,分离请求和动作
  2. 提供扩展性
  3. 零成本api(消息通信)

发后即忘模型:

rabbitmq实现RPC等待响应,私有队列和发送确认 AMQP有reply_to,生产者通过该字段可确认队列,并监听队列等待应答

rabbitmq集群:运行消费者和生产者rabbit节点奔溃情况下继续运行,通过添加更多节点来线性扩展消息通信吞吐量。

Rabbitmq manage

存放 $RABBITMQ_HOME/plugins 目录下

指标

web管理rabbitmq

rabbitmq-plugins enable rabbitmq_management –启用插件

默认用户guest 只允许localhost登录。

(1)超级管理员(administrator)

可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

(2) 监控者(monitoring)

可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

(3) 策略制定者(policymaker)

可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

与administrator的对比,administrator能看到这些内容

(4) 普通管理者(management)

应用管理

// 停止运行 RabbitMQ 的 Erlang 虚拟机和 RabbitMQ 服务应用 pid_file存放于 Mnesia 目录中
rabbitmqctl stop [pid_file]
rabbitmqctl shutdown

// 停止 RabbitMQ 服务应用
rabbitmqctl stop_app

// 启动 RabbitMQ 应用
rabbitmqctl start_app

// 等待 RabbitMQ 应用的启动
rabbitmqctl wait [pid_file]

// RabbitMQ 节点重置还原到最初状态
rabbitmqctl reset

// 强制将 RabbitMQ 节点重置还原到最初状态
rabbitmqctl force_reset

// RabbitMQ 节点轮换日志文件 
rabbitmqctl rotate_logs {suffix}

// 将节点加入指定集群中
rabbitmqctl join_cluster {cluster_node} [--ram]

// 显示集群的状态
rabbitmqctl cluster_status 

// 修改集群节点的类型 
rabbitmqctl change_cluster_node_type {disc|ram} 

// 将节点从集群中删除
rabbitmqctl forget_cluster_node [--offline] 

// 在集群中的节点应用启动前咨询 clusternode 节点的最新信息,并更新相应的集群信息 
rabbitmqctl update_cluster_nodes {clusternode} 

// 确保节点可以启动,即使它不是最后一个关闭的节点
rabbitmqctl force_boot

// 指示未同步队列 queue 的 slave 镜像可以同步 master 镜像行的内容
rabbitmqctl sync_queue [-p vhost] {queue}

// 取消队列 queue 同步镜像的操作
rabbitmqctl cancel_sync_queue [-p vhost] {queue}

// 设置集群名称
rabbitmqctl set_cluster_name {name}


// 队列状态  queueinfoitem可以是很多参数,
rabbitmqctl list_queues [-p vhost] [queueinfoitem ...]

// 交换器状态  
rabbitmqctl list_exchanges [-p vhost] [exchangeinfoitem ...]

// 连接状态  
rabbitmqctl list_bindings [-p vhost] [exchangeinfoitem ...]

// 信道状态  
rabbitmqctl list_channels [-p vhost] [exchangeinfoitem ...]

// 消费者状态  
rabbitmqctl list_consumers [-p vhost] [exchangeinfoitem ...]

// broker状态
rabbitmqctl status

// 对 RabbitMQ节点进行健康检查
rabbitmqctl node_health_check

// 显示每个运行程序环境中每个变量的名称和值 rabbitmqctl environment 

// 为所有服务器状态生成一个服务器状态报告,并将输出重定向到一个文件
rabbitmqctl report 
 
// 执行任意 Erlang表达式
rabbitmqctl eval {expr} 

rabbitMQ 配置

TODO

rabbitMQ 运维

单台RabbitMQ可满足1000/s的吞吐量。集群模式下,某个节点奔溃,那么这节点上消息会丢失

集群所有节点备份所有元数据信息(名称和属性)。基于存储空间和性能考虑,只会在宿主节点创建队列进程和完整队列信息(元数据,状态,内容),其他节点只知道队列元数据和指向该队列所在节点指针

多机多节点


// 配置hosts文件
192.168.0.2 node1 
192.168.0.3 node2 
192.168.0.4 node3

// 编辑 RabbitMQ的cookie文件,以确保各个节点的 cookie 文件使用的是同一个值
cookie/var/lib/rabbitmq/.erlang.cookie 

// 启动 node1、node2 和 node3 这 3 个节点的 RabbitMQ 服务
 rabbitmq-server detached
 
// 将node2 和 node3 节点加入node1节点的集群

rabbitmqctl stop_app 
rabbitmqctl reset 
rabbitmqctl join_cluster rabbit@node1 
rabbitmqctl start_app

如果关闭集群中所有节点,在启动的时最后关闭的那个节点第一个启动, 如果不是,这个节点会等待最后关闭的节点启动 直到超时失败

可通过 rabbitmqctl forget_cluster_node 将最后关闭节点剔出集群

升级集群节点

镜像队列处理路由消息到合适的队列,也要将消息投递到镜像队列中拷贝

warren和sovel:故障转移和复制 warren是指一对主/备独立服务器

日志

RABBITMQ_NODENAME.log 文件查看日志 rabbitmq.config配置文件log_levels参数配置日志级别

单节点恢复

机器故障,机器掉电等导致单节点宕机。

// 命令来将故障节点剔除
rabbitmqctl forget_cluster_node {nodename}

// 要将故障节点的 IP 地址从连接列表里删除

// 删除当前故障机器的 RabbitMQ 中的 Mnesia
数据

// 恢复后作为新节点加入集群

集群迁移

TODO

磁盘节点,内存节点

单节点系统只允许磁盘类型的节点;

集群中要求至少有一个磁盘节点,当磁盘节点挂了时,集群可以继续路由消息,但是不能:创建队列 创建交换器 创建绑定 添加用户 更改权限 添加或删除节点

添加内存节点时确保告知其所有磁盘节点,以便内存节点能在重启时重新加入集群(如果内存节点重启时,磁盘节点宕机,就比较麻烦了)

使用rest api控制rabbitmq

// 查看host虚拟机队列queue的数据统计
http:localhost:15672/api/queues/host/queue

// 查看当前链接列表和详情
/api/connections

// 下载或者上传完整的rabbitmq服务器配置,队列,交换器和绑定
/api/all-configuration

// 列出集群中所有节点
/api/nodes

// 创建或者查看rabbitmq用户
/api/users/<user>

// 查看或者创建虚拟主机
/api/vhosts/<vhost>

// 为用户设置权限
/api/permissions/<vhost>/<user>

TODO

Rabbitmq监控

TODO