标签归档:AMQP

一种分布式系统消息服务器设计

设计一个分布式系统,首先面临的就是如何解决服务间的通信问题,同步还是异步,是采用基于消息总线的事件驱动架构(EDA)还是分布式服务框架,这在很大程度上决定了系统的可扩展性。

消息服务器的可选择性很多,比如早些年的XMPP服务器,传统的JMS服务器,还有目前比较流行的AMQP消息服务器,简单的优缺点对比如下:

类型 优点 缺点
Openfire (XMPP) 1. 成熟,稳定。
2. 适合做聊天服务,
   在IM领域(Gtalk,网易POPO等)应用广泛。
1. 消息可靠性无保障。
2. 路由策略不够灵活。
3. 集群模式不完善。
4. 协议太重。
ActiveMQ (JMS) 1. 成熟,稳定。
2. 与Java应用契合度高。
1. 路由策略不够灵活。
2. 集群模式不稳定。
RabbitMQ (AMQP) 1. 成熟,稳定。
2. 路由策略灵活。
3. 消息可靠传输。
4. 集群方案成熟。
1. 配置项多,学习和运维成本高。

本文分享一种基于RabbitMQ的消息服务器设计方案:

MQ Server

如上图所示,黄色虚线表示的是分布式系统的一个子服务,假设分别为WEB、REPO和CTRL服务。P表示生产者,C表示消费者。生产者把消息发送到一个topic模式的Exchange (X),根据route key和binding key的运算结果,将消息路由到相应队列,再由队列投递给具体的消费者。

我们将请求从服务调用方的角度分成两类:同步和异步。同步(rpc.call)是指需要应答的请求,比如获取依赖服务的状态。异步(rpc.cast)是指无需应答的请求,比如下发一个命令。

1. 对于同步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后,将响应内容封装成消息格式,指定route key(TYPETYPE.${HOSTNAME}),将处理结果消息返回。

2. 对于异步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后无需返回。

无论同步还是异步,服务调用方在发出请求(消息)后会立即返回,不会阻塞。如果是同步请求,那么只需提供回调处理函数,等待响应事件驱动。

每个服务启动后会初始化一条AMQP连接(基于TCP),该连接由3个Channel复用:一个Channel负责生产消息,一个Channel负责从TYPE(REPO/CTRL/WEB等)类型的队列消费消息,一个Channel负责从TYPE.${HOSTNAME}类型的队列消费消息。从队列的角度来看,一个TYPE.${HOSTNAME}类型队列只有一个消费者,一个TYPE类型队列可能有多个消费者。

这样的设计满足以下四类需求:

1. 点对点(P2P)或请求有状态服务:消息的route key设置为TYPE.${HOSTNAME}。比如host1上的WEB服务需要向host2上的REPO服务发送同步请求,只需将消息的route key设置为REPO.host2即可。REPO服务处理完请求后,将响应消息的route key设置为WEB.host1,发送回消息服务器。再比如REPO服务是有状态服务,伸缩性不好做,需要WEB服务做Presharding或者一致性哈希来决定调用哪个REPO服务,也跟上面一样,WEB服务根据计算好的值填充REPO.${HOSTNAME},进行点对点模式的消息通信。

2. 请求无状态服务:如果服务提供方是无状态服务,服务调用方不关心由哪个服务进行响应,那么只需将消息的route key设置为TYPE。比如CTRL是无状态服务,host1上的WEB服务只需将消息的route key设置为CTRL即可。CTRL队列会以Round-robin的调度算法将消息投递给其中的一个消费者。视WEB服务是否无状态而定,CTRL可以选择将响应消息的route key设置为WEB(假设WEB无状态)或者WEB.host1(假设WEB有状态)。

3. 组播:如果服务调用方需要与某类服务的所有节点通信,可以将消息的route key设置为TYPE.*,Exchange会将消息投递到所有TYPE.${HOSTNAME}队列。比如WEB服务需通知所有CTRL服务更新配置,只需将消息的route key设置为CTRL.*

4. 广播:如果服务调用方需要与所有服务的所有节点通信,也就是说对当前系统内所有节点广播消息,可以将消息的route key设置为*.*

本方案优缺点如下:

优点:
1. 路由策略灵活。
2. 支持负载均衡。
3. 支持高可用部署。
4. 支持消息可靠传输(生产者confirm,消费者ack,消息持久化)。
5. 支持prefetch,流控。

缺点:
1. 存在消息重复投递的可能性。
2. 对于多服务协作的场景支持度有限。比如以下场景:WEB服务发送同步请求给CTRL服务,CTRL本身无法提供该服务,需要调用REPO服务,再将REPO服务的响应结果返回给WEB。这个时候就需要CTRL缓存WEB请求,直至REPO响应。
3. 缺少超时管理,错误处理等。

以上列举缺点需要由业务方考虑解决。

顺便再提供本方案的一个简单SDK:Gear

Gear SDK提供的功能包括:
1. 基础组件(Exchange, Binding, Queue, etc)初始化。
2. 连接复用,断线重连。
3. P2P方式,组播,广播消息发送。
4. 异步消息接收。

--EOF--

单机磁盘故障引发RabbitMQ镜像队列数据丢失

某分布式消息推送系统,用RabbitMQ做消息分发。RabbitM采用镜像队列模式、双机HA部署,前端使用负载均衡,因此RabbitMQ主从节点上都会有生产者和消费者连接。此为应用背景。

版本:
OS: Debian 7, 3.2.39-2 x86_64 GNU/Linux
Erlang: R15B01 (erts-5.9.1)
RabbitMQ: 3.0.2 (ha policy, ha-mode:all)

故障现象还原:

1. 节点A的数据盘故障(磁盘控制器故障、无法读写),所有原本A上的生产者消费者failover到B节点。
2. 从节点B的WebUI上看,所有队列信息(包括队列元信息和数据)丢失,但是exchange、binding、vhost等依旧存在。
3. 节点B的日志中出现大量关于消费请求的错误日志:

=ERROR REPORT==== 23-Apr-2015::19:31:26 ===
connection <0.1060.0>, channel 1 - soft error:
{amqp_error,not_found,
"home node 'rabbit@192.168.156.210' of durable queue 'test' in vhost 
'2891f31d' is down or inaccessible", 'basic.consume'}

4. 从生产者端看来一切正常,依旧会收到来自节点B的confirm消息(basic.ack amqp方法)。

经过一番排查定位,上述现象实际上有两个坑在里面:

一、 在数据可靠性方面,镜像队列也不完全可靠。
二、 要保证消息可靠性,生产者端仅仅采用confirm机制还不够。

对于第一个坑:

第一感觉应该是bug,其他人也出现过。在云环境下要重现这个Bug很简单,申请两台云主机做RabbitMQ HA,部署镜像队列。另外申请两块云硬盘,分别挂载到两台云主机,作为数据盘为RabbitMQ提供元信息和队列数据提供存储(${RABBITMQ_HOME}/var/lib),模拟生产者和消费者对主节点(假设为节点A)进行生产消费,此时可以从云平台管理界面上卸载节点A上的云硬盘,观察节点B RabbitMQ WebUI,可以发现队列不见了。:)

换上最新版的3.5.1就没这个问题,但是要确认是哪个版本对它进行修复的却耗费了很久。初步翻阅了3.0.3 ~ 3.5.1之间大概20多个版本的Changelog,没有看到哪个bugfix的描述符合。只好用最笨的办法,对所有版本号采用二分法,挨个版本测试,最终将修复版本确定为3.4.0 (Changelog)。但是即便如此,也还是没有看到具体是哪个bugfix修复了此问题。

对于第二个坑:

对于队列不存在了,RabbitMQ依然向生产者返回confirm消息的情况,实际上通过生产者端的正确编程姿势可以避免。RabbitMQ官方有过说明:

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack.

也就是说,对于那些路由不可达的消息(根据routerkey,找不到相应队列),如果basic.publish方法没有设置mandatory参数(关于mandatory参数,可以参考『AMQP协议mandatory和immediate标志位区别』),那么RabbitMQ会直接丢弃消息,并且向客户端返回basic.ack消息;如果basic.publish方法设置了mandatory参数,那么RabbitMQ会通过basic.return消息返回消息内容,然后再发送basic.ack消息进行确认。

因为节点上的队列元信息在第一个踩的坑里已经丢失了,所以所有发送到此节点上的消息必然是不可到达的,加上未在basic.publish方法上加mandatory参数,因此消息直接被RabbitMQ丢弃,并被正常confirm掉。

要避免踩这个坑,Java SDK的Channel对象提供了addReturnListener()回调方法,当收到服务器端的basic.return消息时,ReturnListener方法被调用,生产者端可以进行消息重传。当然,前提是basicPublish()方法的mandatory参数已经设置为true。

--EOF--

RabbitMQ与Erlang

Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用),那么消息传递有没有开销?有,但是基本可以忽略,消息传递在单机上的本质就是内存复制,要知道DDR3 SDRAM的内存带宽约为16GB/s。

RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器,那么Erlang从语言层面带给RabbitMQ有哪些直接好处?

1. 高并发。
Erlang进程是完全由Erlang虚拟机进行调度和生命周期管理的一种数据结构,它与操作系统进程及线程完全没关系,也不存在数值上的什么对应关系。实际上,一个Erlang虚拟机对应一个操作系统进程,一个Erlang进程调度器对应一个操作系统线程,一般来说,有多少个CPU核就有多少个调度器。Erlang进程都非常轻量级,初始状态只包括几百个字节的PCB和233个字大小的私有堆栈,并且创建和销毁也非常轻量,都在微秒数量级。这些特征使得一个Erlang虚拟机里允许同时存在成千上万个进程,这些进程可以被公平地调度到各个CPU核上执行,因此可以在多核的场景下充分利用资源。

在RabbitMQ的进程模型里,AMQP概念里的channel和queue都被设计成了Erlang进程,从接受客户端连接请求开始,到消息最终持久化到磁盘,消息经过的进程链如下:
RabbitMQ Processes
上图中,tcp_acceptor进程用于接收客户端连接,然后初始化出rabbit_reader,rabbit_writer和rabbit_channel进程。rabbit_reader进程用于接收客户端数据,解析AMQP帧。rabbit_writer进程用于向客户端返回数据。rabbit_channel进程解析AMQP方法,然后进行消息路由等操作,是RabbitMQ的核心进程。rabbit_amqqueue_process是队列进程,rabbit_msg_store是负责进行消息持久化的进程,这两种类型进程都是RabbitMQ启动或者创建队列时创建的。从数量角度看,整个系统中存在,一个tcp_acceptor进程,一个rabbit_msg_store进程,多少个队列就有多少个rabbit_amqqueue_process进程,每条客户端连接对应一个rabbit_reader和rabbit_writer进程,至多对应65535个rabbit_channel进程。结合进程的数量,RabbitMQ的进程模型也可以描述如下图:
messages

RabbitMQ这种细粒度的进程模型正是得益于Erlang的高并发性。

2. 软实时。
Erlang的软实时特性可以从两方面看出。

首先是Erlang也是一门GC语言,但是Erlang的垃圾回收是以Erlang进程为粒度的。因为Erlang的消息传递和进程私有堆机制,使得按进程进行GC很容易实现,不必对一块内存或一个对象进行额外的引用计数。虽然对于单个进程来说,GC期间是“Stop The World”,但是前面也说过一个Erlang应用允许同时存在成千上万个进程,因此一个进程STW对于系统整体性能影响几乎微乎其微。另外,当进程需要销毁时,这个进程占用的所有内存可以直接回收,因为这块内存中的数据都是这个进程私有的。

另一方面,Erlang虚拟机对进程的调度采用的是抢占式策略。每个进程的运行周期都会分配到一定数量的reduction,当进程在进行外部模块函数调用,BIF调用,甚至算术运算都会减少reduction数量,当reduction数量减为0时,此进程交出CPU使用权,被其他进程抢占。相比于一些基于时间分片的软实时系统调度算法,reduction机制更加关注的是进程在执行期间能做多少事情,而不是时间上的绝对平均。

RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量,因此设置内存流控阈值vm_memory_high_watermark时需要注意,默认的0.4就是一个保险的值,当超过0.5时,就有可能造成系统内存被瞬间吃完,RabbitMQ程序被系统OOM Killer杀掉。

3. 分布式。
Erlang可以说原生支持分布式,先看一段程序:

1
2
3
4
5
6
7
8
9
10
run(Node) -> 
    Pid = spawn(Node, fun ping/0), 
    Pid ! self(), 
    receive 
        ping -> ok 
    end. 
ping() ->
    receive
        From -> From ! ping 
    end.

上述程序演示的是一个分布式并发程序,运用了spawn/2,!,receive...end这三个并发原语。spawn/2用于创建进程,!用于异步发送消息,receive...end用于接收消息,注意spawn/2的第一个参数Node,它表示节点名称,这意味着对于应用来说,Pid ! Msg 就是将Msg消息发送到某一个Erlang进程,而无论这个进程是本地进程还是存在于远程的某个节点上,Erlang虚拟机会帮应用搞定一切底层通信机制。也就是说,物理节点分布式对上层Erlang应用来说是透明的。

这为实现RabbitMQ的集群和HA policy机制提供了极大的便利,主节点只要维护哪个是Pid(master),哪几个是slave_pids(slave)信息就行,根据不同的类型(publish和非publish),对队列操作进行replication。

4. 健壮性。
在Erlang的设计哲学里,有一个重要的概念就是“let it crash”。Erlang不提倡防御式编程,它认为程序既然遇到错误就应该让它崩溃,对于一个健壮的系统来说,崩溃不要紧,关键要重新起来。Erlang提供一种supervisor的行为模式,用于构建一棵健壮的进程监督树。监督者进程本身不包含业务逻辑,它只负责监控子进程,当子进程退出时,监督者进程可以根据一些策略将子进程重启。据说爱立信用Erlang写的AXD301交换机系统,可靠性为9个9,这意味着运行20年差不多有1秒的不可用时间,如此高的可靠性就是supervisor行为模式及其背后任其崩溃思想的极致体现(当然也离不开Erlang另外一个法宝代码热更新)。

在RabbitMQ里,supervisor行为模式运用得非常多,基本上每个worker进程都有相应的监督者进程,层层监督。比如下图所示的网络层进程监督树模型(已做过简化):
RabbitMQ Supervisor Tree

椭圆表示进程,矩形表示重启策略,one_for_all表示一个进程挂了其监督者进程的其他子进程也会被重启,比如一个rabbit_reader进程挂了,那么rabbit_channel_sup3进程也会重启,然后所有rabbit_channel根据AMQP协议协商后重新创建。simple_one_for_one则表示一种需要时再初始化的子进程重启策略,适用于一些动态添加子进程的场景,比如图中的rabbit_channel进程和tcp_acceptor进程。

--EOF--

RabbitMQ能打开的最大连接数

RabbitMQ自带了显示能够接受的最大连接数,有2种比较直观的方式:
1. rabbitmqctl命令。

1
2
3
4
5
6
7
8
9
10
11
12
n$ rabbitmqctl status
Status of node 'rabbit@10-101-17-13' ...
[{pid,23658},
 ......
 {file_descriptors,
     [{total_limit,924},
      {total_used,10},
      {sockets_limit,829},
      {sockets_used,10}]},
 ......
]
...done.

2. rabbitmq_management WebUI插件。

本文关注当RabbitMQ可用连接数耗尽时客户端的影响以及如何增加最大连接数默认值。

RabbitMQ的socket连接数(socket descriptors)是文件描述符(file descriptors,fd)的一个子集。也就是说,RabbitMQ能同时打开的最大连接数和最大文件句柄数(文件系统,管道)都是受限于操作系统关于文件描述符数量的设置,两者是此消彼长的关系。初始时,可用socket描述符与可用fd数量的比率大概在0.8-0.9左右,这个值并不固定,当socket描述符有剩余时,RabbitMQ会使用尽量多的文件描述符用于磁盘文件读写。随着服务器建立越来越多的socket连接,文件句柄开始回收,数量减少。总之,RabbitMQ会优先将文件描述符用于建立socket连接,宁可牺牲频繁打开/关闭文件带来的磁盘操作性能损耗,这种取舍很容易理解,作为网络服务器,当然优先保障网络吞吐率了。因此,对于高并发连接数的多队列读写时,队列性能会稍微差那么一点,比如用RabbitMQ做RPC。

当服务器建立的socket连接已经达到限制(sockets_limit)时,服务器不再接受新连接。这里要区分清楚,RabbitMQ不再接收的是AMQP连接,而不是传输层的TCP连接,通过简单抓包分析即可清楚流程:

1
2
3
4
5
6
7
8
$ sudo tcpdump host 10.101.17.13 and port 5672
17:24:12.214186 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [S], seq 3319779561, win 65535, options [mss 1368,nop,wscale 5,nop,nop,TS val 1006381554 ecr 0,sackOK,eol], length 0
17:24:12.214231 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [S.], seq 1636058035, ack 3319779562, win 14480, options [mss 1460,sackOK,TS val 24529834 ecr 1006381554,nop,wscale 5], length 0
17:24:12.218795 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [.], ack 1, win 4110, options [nop,nop,TS val 1006381560 ecr 24529834], length 0
17:24:12.243184 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [P.], seq 1:9, ack 1, win 4110, options [nop,nop,TS val 1006381583 ecr 24529834], length 8
17:24:12.243201 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [.], ack 9, win 453, options [nop,nop,TS val 24529841 ecr 1006381583], length 0
17:24:22.247907 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [F.], seq 9, ack 1, win 4110, options [nop,nop,TS val 1006391550 ecr 24529841], length 0
17:24:22.284914 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [.], ack 10, win 453, options [nop,nop,TS val 24532352 ecr 1006391550], length 0

line 2-4是TCP握手包,成功建立TCP连接。line 5开始客户端向服务器端发送AMQP协议头字符串“AMQP0091”,共8个字节,开始AMQP握手。line 6是服务器回给客户端的ack包,但未发送AMQP connection.start方法,导致客户端一直等到超时(line 7-8),发送FIN包关闭TCP连接。至此,AMQP连接建立失败。

从客户端(Java SDK)来看上述这个过程,客户端通过ConnectionFactory实例的newConnection()方法创建一条AMQP连接。在网络层,它首先通过java.net.Socket与服务器建立一条TCP连接,发送协议协商字符串“AMQP0091”,然后启动MainLoop线程,通过封装的Frame实例来循环读取帧(readFrame()),注意readFrame()方法可能会有一个SocketTimeoutException的超时异常,这个超时时间是由socket实例setSoTimeout方法写入,默认是10s,由AMQConnection.HANDSHAKE_TIMEOUT常量指定。当超时发生在AMQP连接握手阶段时,就抛出一个SocketTimeoutException异常,发生在其他阶段(除心跳超时)时,什么都不做继续下一个循环:

1
2
3
4
Caused by: java.net.SocketTimeoutException: Timeout during Connection negotiation
 at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection...
 at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:59)
 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)

这里的socket读取超时很容易跟连接超时搞混。连接超时由ConnectionFactory实例的setConnectionTimeout()方法指定,对应着网络层Socket实例connect()方法中的timeout参数,指的是完成TCP三次握手的超时时间;而读取超时是从socket中读取字节流的等待时间,前文已经说过,由Socket实例的setSoTimeout()指定。这在各种网络库中应该很常见,比如HttpClient。

私以为这种情况下更好的设计是应该由RabbitMQ主动断开与客户端的TCP连接,减少客户端等待时间。

最后一个问题,如何增加RabbitMQ的能够同时打开的连接数。通过前文可知,最大并发连接数由此进程可打开的最大文件描述符数量(乘以一个比例系数)决定,因此只要增加单个进程可打开的文件描述符数量即可。有几个常规方法,按作用范围可以归纳为以下几类:

1. 进程级别。在启动脚本rabbitmq-server中加入ulimit -n 10240命令(假设将最大文件描述符数量设置为10240,下同),相当于在shell中执行,由此shell进程fork出来的进程都能继承这个配置。

2. 用户级别。修改/etc/security/limits.conf文件,添加以下配置,重新登录生效:

1
2
user    soft    nofile    10240
user    hard    nofile    10240

3. 系统级别。

1
# echo 10240 > /proc/sys/fs/file-max

上述设置只是针对proc文件系统,相当于修改了操作系统的运行时参数,重启后失效。要想永久生效,需要修改/etc/sysctl.conf文件,加入配置项fs.file-max=10240。

一个进程能打开的最大文件描述符数量受限于上述三个级别配置中的最小值。理论上,系统级别的配置数值必须要大于用户级别,用户级别的要大于进程级别的,只有这样配置才是安全的,否则进程容易因为打开文件数量问题受到来自操作系统的种种限制。操作系统为什么要限制可打开的文件描述符数量?为了系统安全。因为文件描述符本质上是一种内存中的数据结构,如果不加以限制,很容易被进程无意或恶意耗尽内存,比如fork bomb

--EOF--

RabbitMQ源码分析:Per-Connection流控机制

生产者生产消息速度过快,使得RabbitMQ进程来不及处理,容易造成进程信箱过大,或者使消息进入不同层次的物理队列,引起过多的处理开销。因此,RabbitMQ实现了一种流控机制来避免上述问题。

RabbitMQ流控机制的核心是一个称为{InitialCredit, MoreCreditAfter}的元组,默认情况下值为{200, 50}。假如消息发送者进程A要给接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住,对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。

每个流控进程的进程字典中都维护着4类键:{credit_from, From}, {credit_to, To}, credit_blocked和credit_deferred,这些键值通过UPDATE宏来更新:

1
2
3
4
5
6
7
8
-define(UPDATE(Key, Default, Var, Expr),
        begin
            case get(Key) of
                undefined -> Var = Default;
                Var       -> ok
            end,
            put(Key, Expr)
        end).

其中,Key为键名,Default为初始值,Var为当前值,Expr为更新当前值的表达式。

{credit_from, From}的值表示还能向消息接收进程From发送多少条消息,当前进程会向多少个进程发送消息,进程字典中就有多少个{credit_from, From}键;{credit_to, To}的值表示当前进程再接收多少条消息,就要向消息发送进程增加Credit数量;credit_blocked的值表示当前进程被哪些进程block了,也就是禁止当前进程向其发送消息的进程列表,比如A向B发送消息,A的进程字典中{credit_from, B}值为0,那么,A的credit_blocked值为[B];credit_deferred缓存着消息接收进程向消息发送进程增加Credit的消息列表,它在当前进程被消息接收进程block时使用,当进程被unblock后(credit_blocked值为空),credit_deferred列表中的消息会依次发送。

RabbitMQ的credit_flow模块封装着流控相关的函数。

credit_flow:send/1和credit_flow:send/2函数在消息发送前调用,每发一条消息,更新{credit_from, From},使对应的Credit减1,当Credit数量为0时,调用block/1函数。

1
2
3
4
5
6
7
8
send(From) -> send(From, ?DEFAULT_CREDIT).
 
send(From, {InitialCredit, _MoreCreditAfter}) ->
    ?UPDATE({credit_from, From}, InitialCredit, C,
            if C == 1 -> block(From),
                         0;
               true   -> C - 1
            end).

credit_flow:block/1函数会更新credit_blocked值,把From对应的进程加到当前block进程列表中,表示当前进程不能再向block进程列表中的进程发送消息。

1
block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).

credit_flow:ack/2函数用于更新{credit_to, To}的值,每收到一条消息,消息接收进程ack一次,将{credit_to, To}值减1,当值减到1时,调用grant/2函数,给予消息发送进程MoreCreditAfter个Credit,使其可以继续发送消息。

1
2
3
4
5
6
7
8
ack(To) -> ack(To, ?DEFAULT_CREDIT).
 
ack(To, {_InitialCredit, MoreCreditAfter}) ->
    ?UPDATE({credit_to, To}, MoreCreditAfter, C,
            if C == 1 -> grant(To, MoreCreditAfter),
                         MoreCreditAfter;
               true   -> C - 1
            end).

credit_flow:grant/2函数可以给予消息发送进程指定数量的Credit,它会向目标进程发送一条{bump_credit, {self(), Quantity}}的普通消息,根据当前进程是否被block,grant/2函数会决定将bump_credit消息立即发送还是放入credit_deferred进程字典中缓存起来,等待时机合适再发送。

1
2
3
4
5
6
grant(To, Quantity) ->
    Msg = {bump_credit, {self(), Quantity}},
    case blocked() of
        false -> To ! Msg;
        true  -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
    end.

credit_flow:blocked/0函数用于判断当前进程是否被block,其判断依据是该进程进程字典中credit_blocked值是否为空,空表示未block,不为空表示其被某些进程block了。

1
2
3
4
5
blocked() -> case get(credit_blocked) of
                 undefined -> false;
                 []        -> false;
                 _         -> true
             end.

credit_flow:grant/2函数会向消息发送进程发送一条bump_credit消息,那么消息发送进程收到这条消息后,会调用credit_flow:handle_bump_msg/1函数进行处理,更新相应的{credit_from, From}值,增加Credit数量。

1
2
3
4
5
6
handle_bump_msg({From, MoreCredit}) ->
    ?UPDATE({credit_from, From}, 0, C,
            if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
                                                    C + MoreCredit;
               true                              -> C + MoreCredit
            end).

如果当前的Credit值小于0,并且加上收到的MoreCredit个Credit后值大于0,这种情况表示当前进程可以继续向From进程发送消息,进行一次unblock/1操作。

credit_flow:unblock/1函数用于将消息接收进程从当前进程的credit_blocked列表中删除。

1
2
3
4
5
6
7
8
9
unblock(From) ->
    ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
    case blocked() of
        false -> case erase(credit_deferred) of
                     undefined -> ok;
                     Credits   -> [To ! Msg || {To, Msg} <- Credits]
                 end;
        true  -> ok
    end.

如果unblock/1操作后当前进程的block列表为空,那么需要取出由credit_deferred缓存的消息列表,依次发送。

另外,为了避免消息接收进程或者消息发送进程挂掉引起其他进程永远被block,当检测到有进程挂掉后需要调用peer_down/1进行相关记录清理。

1
2
3
4
5
peer_down(Peer) ->
    unblock(Peer),
    erase({credit_from, Peer}),
    erase({credit_to, Peer}),
    ok.

RabbitMQ中,一条消息从接收到入队,在进程间的传递链路为rabbit_reader -> rabbit_channel -> rabbit_amqqueue_process -> rabbit_msg_store。进程之间构成一个有向无环图:
messages

这里以rabbit_reader -> rabbit_channel之间的消息流控为例说明。

rabbit_reader:process_frame/3函数解析帧,当该帧为一个包含内容的帧时,调用rabbit_channel:do_flow/3函数:

1
2
3
4
5
6
7
8
9
10
11
 
process_frame(Frame, Channel, State) ->
   ......
    case rabbit_command_assembler:process(Frame, AState) of
       ......
        {ok, Method, Content, NewAState} ->
            rabbit_channel:do_flow(ChPid, Method, Content),
            put(ChKey, {ChPid, NewAState}),
            post_process_frame(Frame, ChPid, control_throttle(State));
        ......
    end.

rabbit_channel:do_flow/3函数做的事情是调用credit_flow:send/1更新自己的{credit_from, Pid}值,然后将包含消息的帧发送给rabbit_channel进程(由参数Pid指定):

1
2
3
do_flow(Pid, Method, Content) ->
    credit_flow:send(Pid),
    gen_server2:cast(Pid, {method, Method, Content, flow}).

注意do_flow/3虽然属于rabbit_channel模块,但是它是在rabbit_reader:process_frame/3函数中被调用的,credit_flow:send/1修改的是rabbit_reader进程的进程字典,只有执行完gen_server2:cast/2函数后,消息才进入到rabbit_channel进程空间,rabbit_channel模块对此异步消息的处理函数为:

1
2
3
4
5
6
7
handle_cast({method, Method, Content, Flow},
            State = #ch{reader_pid = Reader}) ->
    case Flow of
        flow   -> credit_flow:ack(Reader);
        noflow -> ok
    end,
    ......

rabbit_channel进程每收到一条消息,就调用一次credit_flow:ack/1函数,当进程字典中的{credit_to, Reader}值为0时,向rabbit_reader进程发送bump_credit消息,此消息由rabbit_reader进程的handle_other({bump_credit, Msg}, State)函数进行处理。

1
2
3
handle_other({bump_credit, Msg}, State) ->
    credit_flow:handle_bump_msg(Msg),
    control_throttle(State);

流控机制的基本原理就是这样,但是讲到现在,都还没有讲到RabbitMQ是怎么阻塞客户端的数据请求,现在以一个具体场景为例说明:

在上面的消息传递链中,假设rabbit_msg_store进程(Pid为StorePid)来不及处理rabbit_amqqueue_process进程(Pid为AMQPid)过来的消息,根据流控机制,慢慢地rabbit_amqqueue_process进程字典中的{credit_from, StorePid}值变为0,credit_blocked值变为[StorePid],此时如果有消息从rabbit_channel发送到rabbit_amqqueue_process进程,rabbit_amqqueue_process进程还是会将消息源源不断地发送到rabbit_msg_store进程。咋看之下,流控并没有起作用,实际上,此时rabbit_channel进程字典中的{credit_from, AMQPid}值也会慢慢减少,并且,rabbit_channel进程不会再收到来自rabbit_amqqueue_process进程的bump_credit消息,因为rabbit_amqqueue_process的credit_blocked列表不为空,它会将待发的{bump_credit, {self(), Quantity}}消息缓存在credit_deferred列表中!逐渐,rabbit_channel进程字典中的credit_blocked列表变为[AMQPid],rabbit_channel进程无法向其上游的rabbit_reader进程发送bump_credit消息(全都缓存在自己进程字典的credit_deferred里)。到最后,最上游的rabbit_reader进程字典中credit_blocked列表不为空,在此进程中调用credit_flow:blocked/0返回true。回到rabbit_reader进程,每处理完一条包含内容的帧时(rabbit_channel:do_flow/3),都会调用control_throttle/1函数,如果credit_flow:blocked/0返回true,会将当前的连接状态置为blocking:{running, true} -> State#v1{connection_state = blocking},随即,连接状态会转变为blocked。这时,当rabbit_reader继续接受客户端数据时,就会进入recvloop/2函数子句:

1
2
recvloop(Deb, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, State);

最终进程阻塞在mainloop/2的rabbit_net:recv/1函数上。rabbit_net:recv/1函数会阻塞的原因是RabbitMQ采用了gen_tcp的半阻塞模型,也就是说每次接受一个tcp消息之后,必须显式调用inet:setopts(Sock, [{active, once}])来激活一下,否则,进程会一直阻塞在receive语句上。

以上就是假设rabbit_msg_store处理速度跟不上,最终导致rabbit_reader进程停止接收客户端数据的流控机制作用的过程。根据实现原理,消息链进程所构成的有向无环图中,任何一条边触发流控机制,最终都会导致这条连接停止接收客户端数据。

注:RabbitMQ源码为3.1.5。

Reference:
[1]. Alvaro Videla - RABBITMQ INTERNALS - CREDIT FLOW FOR ERLANG PROCESSES.
[2]. RabbitMQ流量控制机制分析.docx

--EOF--