标签归档:RabbitMQ

Qmeta: RabbitMQ元数据备份工具

RabbitMQ的元数据泛指服务器端的Exchange、Binding、Queue、Vhost、Policy、账户、权限等定义信息。对元数据进行备份有两个好处:

1. 队列迁移。可以直接导入,不必所有信息重新输入。
2. 故障恢复。『单机磁盘故障引发RabbitMQ镜像队列数据丢失』一文中分析了镜像队列数据丢失的场景,如果元数据进行过备份的话,可以快速恢复服务。

RabbitMQ管理插件本身提供了元数据备份的API(GET /api/definitions),关键是如何高效地备份,特别是当手上有上百个RabbitMQ节点需要运维的时候。

Qmeta是我用Go写的一个RabbitMQ元数据备份小工具,它支持批量备份元数据,并以json的形式存储在备份目录下。

使用方式:
1. 构建

$ git clone git@github.com:fengchj/ftool.git
$ cd ftool/qmeta
$ go build 

2. 使用

$ qmeta -h
Usage of qmeta:
  -file="config": config file contains RabbitMQ node addrs.
$ cat config
127.0.0.1:15672 guest guest
10.20.30.40:15672 guest guest
$ qmeta -file config
Host 127.0.0.1 backup done!
Host 10.20.30.40 backup done!
$ cd qmeta_{time-in-second}
$ ls -l
127.0.0.1.json
10.20.30.40.json

简单介绍下Qmeta的实现:

1. flag包解析配置文件路径。
2. 每个节点的备份任务交给独立的goroutine处理。
3. net/http包发送HTTP GET请求,设置连接超时参数Timeout。
4. 所有goroutine共享http.Client实例,协程安全。
5. 使用带缓冲区的channel来实现主程序和goroutine的同步,当所有goroutine返回后,安全关闭channel。
6. 协程在defer语句中进行channel数据写入,以此来保证是否备份成功,goroutine都会返回。

--EOF--

Erlang进程GC引发RabbitMQ Crash

生产环境下曾数次出现过RabbitMQ异常崩溃的场景,好在镜像队列的部署方式对服务未产生影响。

我们的RabbitMQ部署环境是:

KVM: 2 VCPU, x86_64, 4G RAM, swap disabled。
OS: Debian 7.0, Linux Kernel 3.2.0-4-amd64
Erlang: R15B01 (erts-5.9.1) [64-bit] [smp:2:2]
RabbitMQ: 3.1.5, {vm_memory_high_watermark,0.4}, {vm_memory_limit,1663611699}

分析最近一次崩溃产生的erl_crash.dump文件:

=erl_crash_dump:0.1
Tue Jun 16 00:46:49 2015
Slogan: eheap_alloc: Cannot allocate 1824525600 bytes of memory (of type "old_heap").
System version: Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:2:2] [async-threads:30] [kernel-poll:true]
Compiled: Sun Jan 27 18:19:34 2013
Taints:
Atoms: 22764
=memory
total: 1980253120
processes: 1563398383
processes_used: 1563397555
system: 416854737
atom: 703377
atom_used: 674881
binary: 385608216
code: 18475052
ets: 7643488
......

发现Crash原因是:Cannot allocate 1824525600 bytes of memory (of type "old_heap")。从数据来看,Erlang虚拟机已占用约1.98G内存(其中分配给Erlang进程的占1.56G),此时仍向操作系统申请1.82G,因为操作系统本身以及其他服务也占用一些内存,当前系统已经分不出足够的内存了,所以Erlang虚拟机崩溃。

此处有两个不符合预期的数据:
1. vm_memory_limit控制在1.67G左右,为什么崩溃时显示占用了1.98G?
2. 为什么Erlang虚拟机会额外再申请1.82G内存?

经过一些排查和总结,发现几次崩溃都是出现在队列中消息堆积较多触发了流控或者有大量unack消息requeue操作的时候,基本把原因确定为Erlang对进程进行Major GC时系统内存不足上。

在之前的『RabbitMQ与Erlang』一文中,曾简单介绍过Erlang的软实时特性,其中按进程GC是实现软实时的一个重要手段:

RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量。

这也是RabbitMQ将内存流控的安全阈值设置为0.4的原因,即使double,也是0.8,还是安全的。0.4的意思是,当broker的内存使用量大于40%时,开始进行生产者流控,但是该参数并不承诺broker的内存使用率不大于40%。官方文档也强调过这一点:

The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM).

实际上通过RabbitMQ运行日志很容易证实,broker的实际内存使用量可以远远大于vm_memory_high_watermark设定值,比如:

$ less /var/log/rabbitmq/rabbit@10.xx.xx.xx.log
=INFO REPORT==== 16-Jun-2015::00:46:25 ===
vm_memory_high_watermark set. Memory used:2497352280 allowed:1663611699
=WARNING REPORT==== 16-Jun-2015::00:46:25 ===
memory resource limit alarm set on node 'rabbit@10.xx.xx.xx'.

内存流控阈值掐在1.6G,但是实际使用了近2.5G!再加上操作系统本身和其他服务吃掉的内存,轻松超过3G。如果此时触发Erlang进程Major GC,需要占用双倍的当前堆内存大小,那么报本文开头的old_heap堆内存分配错误也就不足为奇了。

知道了问题所在,如何解决这个问题呢?

遗憾的是,我目前也没有确切的解决方法(如有人知道,烦请告知)。但是从问题的形成原因来看,至少从以下几个方面可以显著降低问题出现概率。

1. RabbitMQ独立部署,不与其他Server共享内存资源。
2. 进一步降低vm_memory_high_watermark值,比如设置成0.3,但是这种方式会造成内存资源利用率太低。
3. 开启swap,问题在于容易造成性能和吞吐量恶化。
4. 升级RabbitMQ至新版(3.4+)。我个人觉得这个问题的根本原因在于RabbitMQ对内存的管理上,特别是早期版本,Matthew(RabbitMQ作者之一)也曾谈到过这个问题。尽管设计之初就考虑了各种优化,使得队列进程私有堆尽量小,比如当触发了某些条件,会page out到磁盘,或以二进制数据的方式存储在共享数据区,但是即便如此,在某些特定时刻,队列进程私有堆仍会消耗大量内存,这点从Management Plugin或者Remote shell里都可以看出来,而每次的“Cannot allocate old_heap”问题恰恰也总是出现在这些时刻。RabbitMQ 3.4版本发布后,官方发布了一篇新版内存管理方面的博客,从介绍上来看,要准确计算和控制当前系统使用的内存量确实非常困难。所以作者的感慨有理:

Of course, this still doesn't give a perfect count of how much memory is in use by a queue; such a thing is probably impossible in a dynamic system. But it gets us much closer.

--EOF--

一种分布式系统消息服务器设计

设计一个分布式系统,首先面临的就是如何解决服务间的通信问题,同步还是异步,是采用基于消息总线的事件驱动架构(EDA)还是分布式服务框架,这在很大程度上决定了系统的可扩展性。

消息服务器的可选择性很多,比如早些年的XMPP服务器,传统的JMS服务器,还有目前比较流行的AMQP消息服务器,简单的优缺点对比如下:

类型 优点 缺点
Openfire (XMPP) 1. 成熟,稳定。
2. 适合做聊天服务,
   在IM领域(Gtalk,网易POPO等)应用广泛。
1. 消息可靠性无保障。
2. 路由策略不够灵活。
3. 集群模式不完善。
4. 协议太重。
ActiveMQ (JMS) 1. 成熟,稳定。
2. 与Java应用契合度高。
1. 路由策略不够灵活。
2. 集群模式不稳定。
RabbitMQ (AMQP) 1. 成熟,稳定。
2. 路由策略灵活。
3. 消息可靠传输。
4. 集群方案成熟。
1. 配置项多,学习和运维成本高。

本文分享一种基于RabbitMQ的消息服务器设计方案:

MQ Server

如上图所示,黄色虚线表示的是分布式系统的一个子服务,假设分别为WEB、REPO和CTRL服务。P表示生产者,C表示消费者。生产者把消息发送到一个topic模式的Exchange (X),根据route key和binding key的运算结果,将消息路由到相应队列,再由队列投递给具体的消费者。

我们将请求从服务调用方的角度分成两类:同步和异步。同步(rpc.call)是指需要应答的请求,比如获取依赖服务的状态。异步(rpc.cast)是指无需应答的请求,比如下发一个命令。

1. 对于同步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后,将响应内容封装成消息格式,指定route key(TYPETYPE.${HOSTNAME}),将处理结果消息返回。

2. 对于异步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后无需返回。

无论同步还是异步,服务调用方在发出请求(消息)后会立即返回,不会阻塞。如果是同步请求,那么只需提供回调处理函数,等待响应事件驱动。

每个服务启动后会初始化一条AMQP连接(基于TCP),该连接由3个Channel复用:一个Channel负责生产消息,一个Channel负责从TYPE(REPO/CTRL/WEB等)类型的队列消费消息,一个Channel负责从TYPE.${HOSTNAME}类型的队列消费消息。从队列的角度来看,一个TYPE.${HOSTNAME}类型队列只有一个消费者,一个TYPE类型队列可能有多个消费者。

这样的设计满足以下四类需求:

1. 点对点(P2P)或请求有状态服务:消息的route key设置为TYPE.${HOSTNAME}。比如host1上的WEB服务需要向host2上的REPO服务发送同步请求,只需将消息的route key设置为REPO.host2即可。REPO服务处理完请求后,将响应消息的route key设置为WEB.host1,发送回消息服务器。再比如REPO服务是有状态服务,伸缩性不好做,需要WEB服务做Presharding或者一致性哈希来决定调用哪个REPO服务,也跟上面一样,WEB服务根据计算好的值填充REPO.${HOSTNAME},进行点对点模式的消息通信。

2. 请求无状态服务:如果服务提供方是无状态服务,服务调用方不关心由哪个服务进行响应,那么只需将消息的route key设置为TYPE。比如CTRL是无状态服务,host1上的WEB服务只需将消息的route key设置为CTRL即可。CTRL队列会以Round-robin的调度算法将消息投递给其中的一个消费者。视WEB服务是否无状态而定,CTRL可以选择将响应消息的route key设置为WEB(假设WEB无状态)或者WEB.host1(假设WEB有状态)。

3. 组播:如果服务调用方需要与某类服务的所有节点通信,可以将消息的route key设置为TYPE.*,Exchange会将消息投递到所有TYPE.${HOSTNAME}队列。比如WEB服务需通知所有CTRL服务更新配置,只需将消息的route key设置为CTRL.*

4. 广播:如果服务调用方需要与所有服务的所有节点通信,也就是说对当前系统内所有节点广播消息,可以将消息的route key设置为*.*

本方案优缺点如下:

优点:
1. 路由策略灵活。
2. 支持负载均衡。
3. 支持高可用部署。
4. 支持消息可靠传输(生产者confirm,消费者ack,消息持久化)。
5. 支持prefetch,流控。

缺点:
1. 存在消息重复投递的可能性。
2. 对于多服务协作的场景支持度有限。比如以下场景:WEB服务发送同步请求给CTRL服务,CTRL本身无法提供该服务,需要调用REPO服务,再将REPO服务的响应结果返回给WEB。这个时候就需要CTRL缓存WEB请求,直至REPO响应。
3. 缺少超时管理,错误处理等。

以上列举缺点需要由业务方考虑解决。

顺便再提供本方案的一个简单SDK:Gear

Gear SDK提供的功能包括:
1. 基础组件(Exchange, Binding, Queue, etc)初始化。
2. 连接复用,断线重连。
3. P2P方式,组播,广播消息发送。
4. 异步消息接收。

--EOF--

RabbitMQ不同Confirm模式下的性能对比

前几天看到一篇文章『Evaluating persistent, replicated message queues』,作者比较客观地分析了多种分布式消息服务器间集群和消息可靠传输机制,比对了各自的性能情况,他的测试场景为:

1. 分布式队列,节点间数据复制(同步、异步)
2. 消息可靠性等级最高(持久化、ack等)

由于不同的消息服务器实现原理不同,会造成集群节点间数据复制代价和消息可靠性上的差异,最终文章给出的基准性能测试数据(消息吞吐量维度)总结如下图:
benchmark-summary

从上图来看,Kafka毫无争议的拥有最大的消息吞吐量。但是RabbitMQ的数据却是有点反直觉,因为之前给人的感觉RabbitMQ作为一款工业级的消息队列服务器,虽说不是靠高性能扬名,但也不至于让性能问题成为累赘。

网上的基准测试结果只能作为参考,不能作为技术选型的依据。当我们看到某类产品(Web服务器、缓存、队列、数据库等)的一组性能测试数据时,首先要了解以下三点:

1. 作者是否利益相关,利益相关往往导致给出的数据和结论偏向自家产品。
2. 作者是否能hold得住不同产品的技术细节,有时候一个参数值的优化会影响到产品的表现。
3. 具体的测试场景和参数。这个不用多解释了,每个产品都有自己擅长和不擅长的使用场景。

对于上述的第2点和第3点,本质上是由于信息不对称造成的。本文的目的不是为了质疑那篇文章的结论,而是借此分析一下是否可以通过一些客户端程序优化来提升RabbitMQ性能。

之前自己也做过RabbitMQ的性能测试,对于固定消息体大小和线程数,如果消息持久化、生产者confirm、消费者ack三个参数中开启消息持久化和生产者confirm,那么对性能影响相当致命,能够衰减一个数量级,吞吐量甚至会退化到几百msg/s。

消息持久化的优化没太好方法,用更好更快的物理存储(SAS,SSD,RAID卡)总会带来改善的。生产者confirm这一环节的优化则主要在于客户端程序的优化上。归纳起来,客户端实现生产者confirm有三种编程方式:

1. 普通confirm模式。每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
2. 批量confirm模式。每次发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
3. 异步confirm模式。提供一个回调方法,服务器端confirm了一条(或多条)消息后SDK会回调这个方法。

从编程实现的复杂度上来看:

第1种普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务器端返回false或者超时时间内未返回,客户端进行消息重传。

第2种批量confirm模式稍微复杂一点,客户端程序需要定期(每x秒)或定量(每x条)或者两者结合来pubish消息,然后等待服务器端confirm。相比普通confirm模式,批量可以极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

第3种异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出的消息序号),我们需要自己为每个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率角度上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK里的waitForConfirms()方法也是通过SortedSet维护消息序号的。

我写了一个简单的RabbitMQ生产者confirm环节性能测试程序放在了github上,它实现了上述三种confirm模式,并且有丰富的参数可以配置,比如生产者数量、消费者数量、消息体大小、消息持久化、生产者confirm、消费者ack等等,可以根据使用场景组合。以下的讨论都是基于这个测试程序跑出来的结果。

首先是测试环境:

OS: OSX 10.10, Darwin 14.0.0, x86_64
Erlang: R16B03-1 (erts-5.10.4)
RabbitMQ: 3.5.1
CPU: 2.5 GHz Intel Core i5
Disk: SATA
Message Size: 1000 Bytes

一、单线程,未开启消息持久化和消费者ack。

普通 批量,50 msg/批 批量,100 msg/批 批量,200 msg/批 异步
2931 msg/s 6581 msg/s 7019 msg/s 7563 msg/s 8550 msg/s

可见,单线程跑时批量和异步confirm甩开普通confirm一大截了。严格来讲,异步confirm不存在单线程模式,因为回调handleAck()方法的线程和publish消息的线程不是同一个。

二、多线程,开启消息持久化和消费者ack。

多线程下普通confirm模式:

100线程 500线程 800线程 1000线程
659 msg/s 2110 msg/s 2353 msg/s 2477 msg/s

多线程下批量confirm模式:

100线程,50 msg/批 100线程,100 msg/批 100线程,500 msg/批 500线程,100 msg/批
3828 msg/s 3551 msg/s 3567 msg/s 3829 msg/s

多线程下异步confirm模式:

50线程 100线程 200线程
3621 msg/s 3378 msg/s 2842 msg/s

以上是不同线程数量的维度下,相同confirm模式的性能数据,大致来看,遵循线程数越大,吞吐量越大的规律。当然,当线程数量达到一个阈值之后,吞吐量会下降。通过这些数据还能得到一个隐式的结论:不论哪种confirm模式,通过调整客户端线程数量,都可以达到一个最大吞吐量值。无非是达到这个最大值的代价不同,比如异步模式需要少量线程数就能达到,而普通模式需要大量线程数才能达到。

最后再从相同线程数量(100线程数)的维度下,分析下不同confirm模式的性能数据:

普通 批量,50 msg/批 批量,100 msg/批 批量,500 msg/批 异步
659 msg/s 3828 msg/s 3551 msg/s 3567 msg/s 3378 msg/s

由此可见,选取了一个典型的线程数量(100)后,普通confirm模式性能相比批量和异步模式,差了一个数量级。

从以上所有的数据分析来看,异步和批量confirm模式两者没有明显的性能差距,实际上他们的实现原理是一样,无非是客户端SDK进行了不同的封装而已。所以,只需从可编程性的角度选择异步或批量或者两者结合的模式即可。相比而言,选择普通confirm模式只剩编程简单这个理由了。

回到本文开头提到的不同队列服务之间的性能对比,实际上,我认为RabbitMQ最大的优势在于它提供了最灵活的消息路由策略、高可用和可靠性,可靠性又分为两部分(消息可靠性和软件可靠性),以及丰富的插件、平台支持和完善的文档。然而,由于AMQP协议本身的灵活性导致了它比较重量,所以造成了它相比某些队列服务(如Kafka)吞吐量处于下风。因此,当选择一个消息队列服务时,关键还是看需求上更看重消息吞吐量、消息堆积能力还是路由灵活性、高可用、可靠传输这些方面,只有先确定使用场景,根据使用场景对不同服务进行针对性的测试和分析,最终得到的结论才能成为技术选型的依据。

--EOF--

单机磁盘故障引发RabbitMQ镜像队列数据丢失

某分布式消息推送系统,用RabbitMQ做消息分发。RabbitM采用镜像队列模式、双机HA部署,前端使用负载均衡,因此RabbitMQ主从节点上都会有生产者和消费者连接。此为应用背景。

版本:
OS: Debian 7, 3.2.39-2 x86_64 GNU/Linux
Erlang: R15B01 (erts-5.9.1)
RabbitMQ: 3.0.2 (ha policy, ha-mode:all)

故障现象还原:

1. 节点A的数据盘故障(磁盘控制器故障、无法读写),所有原本A上的生产者消费者failover到B节点。
2. 从节点B的WebUI上看,所有队列信息(包括队列元信息和数据)丢失,但是exchange、binding、vhost等依旧存在。
3. 节点B的日志中出现大量关于消费请求的错误日志:

=ERROR REPORT==== 23-Apr-2015::19:31:26 ===
connection <0.1060.0>, channel 1 - soft error:
{amqp_error,not_found,
"home node 'rabbit@192.168.156.210' of durable queue 'test' in vhost 
'2891f31d' is down or inaccessible", 'basic.consume'}

4. 从生产者端看来一切正常,依旧会收到来自节点B的confirm消息(basic.ack amqp方法)。

经过一番排查定位,上述现象实际上有两个坑在里面:

一、 在数据可靠性方面,镜像队列也不完全可靠。
二、 要保证消息可靠性,生产者端仅仅采用confirm机制还不够。

对于第一个坑:

第一感觉应该是bug,其他人也出现过。在云环境下要重现这个Bug很简单,申请两台云主机做RabbitMQ HA,部署镜像队列。另外申请两块云硬盘,分别挂载到两台云主机,作为数据盘为RabbitMQ提供元信息和队列数据提供存储(${RABBITMQ_HOME}/var/lib),模拟生产者和消费者对主节点(假设为节点A)进行生产消费,此时可以从云平台管理界面上卸载节点A上的云硬盘,观察节点B RabbitMQ WebUI,可以发现队列不见了。:)

换上最新版的3.5.1就没这个问题,但是要确认是哪个版本对它进行修复的却耗费了很久。初步翻阅了3.0.3 ~ 3.5.1之间大概20多个版本的Changelog,没有看到哪个bugfix的描述符合。只好用最笨的办法,对所有版本号采用二分法,挨个版本测试,最终将修复版本确定为3.4.0 (Changelog)。但是即便如此,也还是没有看到具体是哪个bugfix修复了此问题。

对于第二个坑:

对于队列不存在了,RabbitMQ依然向生产者返回confirm消息的情况,实际上通过生产者端的正确编程姿势可以避免。RabbitMQ官方有过说明:

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack.

也就是说,对于那些路由不可达的消息(根据routerkey,找不到相应队列),如果basic.publish方法没有设置mandatory参数(关于mandatory参数,可以参考『AMQP协议mandatory和immediate标志位区别』),那么RabbitMQ会直接丢弃消息,并且向客户端返回basic.ack消息;如果basic.publish方法设置了mandatory参数,那么RabbitMQ会通过basic.return消息返回消息内容,然后再发送basic.ack消息进行确认。

因为节点上的队列元信息在第一个踩的坑里已经丢失了,所以所有发送到此节点上的消息必然是不可到达的,加上未在basic.publish方法上加mandatory参数,因此消息直接被RabbitMQ丢弃,并被正常confirm掉。

要避免踩这个坑,Java SDK的Channel对象提供了addReturnListener()回调方法,当收到服务器端的basic.return消息时,ReturnListener方法被调用,生产者端可以进行消息重传。当然,前提是basicPublish()方法的mandatory参数已经设置为true。

--EOF--