月度归档:2014年03月

erlang supervisor行为模式

supervisor是Erlang OTP中的监督者行为模式,它可以启动多个子进程,每个子进程也是一个supervisor或者worker。Erlang通过supervisor行为模式可以构建一棵树形结构的监督者进程树,实现一个可容错的系统。本文仅对supervisor行为模式的使用做一个简单的记录,点到为止,更详细的介绍参阅官方文档:supervisor模块supervisor行为模式

supervisor行为模式的用户回调函数为init/1,它的返回值如下:

1
{ok,{{RestartStrategy,MaxR,MaxT},[ChildSpec]}}

其中,RestartStrategy为子进程的重启策略,有以下四种可选项:
1. one_for_all. 一个子进程挂了,所有子进程都重启。
2. one_for_one. 一个子进程挂了,只有这个子进程重启。
3. rest_for_one. 一个子进程挂了,这个子进程及启动时间在这个进程之后的子进程均需要重启。
4. simple_one_for_one. 从策略上来说也是一个子进程挂了,只有这个子进程会重启。但是这个选项只适用于所有子进程都是相同类型的,并且动态添加到监督树中的场景。

MaxR和MaxT的意义是规定了重启子进程的尝试次数,它表示MaxT秒之内不得重启超过MaxR次,这是为了避免无意义的无限循环崩溃。当MaxR取值为0时,表示不采用重启策略,子进程挂了就挂了。

ChildSpec定义子进程规范,完整规范如下:

1
{Id, StartFunc, Restart, Shutdown, Type, Modules}

其中比较重要的属性为:
StartFunc: {M,F,A},表示子进程的启动函数,子进程就是通过调用StartFunc创建出来的。
Restart: 定义了子进程的重启规则,它有三个选项,permanent表示总是重启,transient表示异常才重启,temporary表示不重启。Restart和上文的RestartStrategy应注意区分,它们从两个不同的角度定义了子进程重启策略,Restart针对了单个子进程的重启策略,而RestartStrategy针对的是子进程间的相互影响。
Shutdown: 定义了子进程如何被关闭。有强制关闭(brutal_kill)和优雅关闭(int()>0, infinity)等选项。
Type: 定义子进程是worker类型或是supervisor类型。

以上的init/1回调函数返回值规范所有选项最终以“与”的逻辑关系作用于子进程,以重启策略为例:如果RestartStrategy为one_for_all,MaxR不为0,子进程规范A的Restart为permanent,B的Restart为temporary,那么当A异常退出时,监督者进程只会重启A,而不会重启B。而上述条件中把MaxR改为0后,那么A,B均不重启。

除了用户模块的init/1回调函数返回值规范之外,学习supervisor行为模式还需了解两个很重要的模块内函数(当然其他也重要,但是这两个用得广泛,必须理解):
1. start_link/2,3:
当外部模块调用supervisor:start_link/2,3启动一棵监督树时,supervisor模块会回调用户模块的init/1函数,完成自身进程和子进程的创建。supervisor行为模式本身是通过gen_server行为模式实现,supervisor:start_link/2,3内部调用gen_server:start_link/2,3创建一个监督者进程,当进入到superver模块的回调函数init/1(此init/1函数是gen_server行为模式的回调函数)后,意味着此时监督者进程已经创建完毕。然后紧接着它将自己提升为系统进程(通过process_flag(trap_exit, true)实现),并依次按列表从左至右调用子进程规范实现子进程的创建,注意子进程的创建是通过子进程规范里的启动函数{M,F,A}实现的,一般{M,F,A}里直接(worker进程)或间接(supervisor进程)调用gen_server:start_link/2,3,而不是在supervisor模块init/1函数中spawn出进程来apply MFA实现。

2. start_child/2: start_child/2函数接受两个参数SupRef和ChildSpec,前者SupRef表示监督者进程,后者ChildSpec表示子进程规范,其定义相当诡异。当监督者进程的重启策略为simple_one_for_one时,那么ChildSpec是一个列表L,然后监督者进程通过apply(M, F, A++L)的方式启动子进程,这里的MFA即init/1回调函数子进程规范中的StartFunc。另外,simple_one_for_one类型重启策略下,子进程不会通过init/1创建,而只能通过start_child/2动态添加;当监督者进程的重启策略为其他类型(非simple_one_for_one)时,ChildSpec同上文的子进程规范定义,是一个元组复合类型。RabbitMQ中就有大量使用了start_child/2函数来启动子进程,通过这种方式启动子进程更加灵活。顺便也借此吐槽一下Erlang的接口设计,ChildSpec参数忽而元组类型,忽而是简单列表类型,而且同一个参数含义差异极大,这样的设计有滥用动态类型特性的嫌疑。

--EOF--

『崔健在一无所有中呐喊 —— 中国摇滚备忘录』

『崔健在一无所有中呐喊 —— 中国摇滚备忘录』好久没这么酣畅淋漓地看过一本书了,它从世界范围内摇滚乐的起源,崛起和繁荣,结合中国国情,解释了崔健红火的原因及其在中国摇滚乐发展史上的历史地位。从各方面来看,『崔健在一无所有中呐喊』本身作为一部报告文学作品其质量相当高,我们看到了一个立体的崔健,他有优点和缺点,也看清了成就崔健的这个时代,用行话说,就是“干货很多”。

『台湾流行音乐200最佳专辑』曾经这样评价崔健的『一无所有』专辑:“这张专辑是一把刀子,把中国的流行音乐史切成了‘崔健前’和‘崔健后’”。崔健所处的,是一个压抑,虚无,信仰缺失,精神饥渴的时代,人们太需要这样的声音了,愤怒、叛逆、质疑……崔健的演出就像一团烈火,过境之处,一片荒芜,然后生命萌芽和意识觉醒,北京,武汉,西安,成都,广州、厦门……概不例外。透过作者的描述可知,人们对崔健产生了宗教式的狂热,这份狂热已经模糊了崔健和摇滚乐的界限,分不清台下人释放的是对崔健的热爱,还是对摇滚乐的痴迷,亦或是自己内心压抑已久的自由渴望。近十年来,超女和周杰伦都曾是轰动一时的社会现象,但是归根到底,还只是人们在进行一次快餐式消费,最多只是一种饭后谈资。然而,回到80年代末,崔健热的背后可是一大群迷惘的人们的精神寄托,所以不乏有女歌迷甘愿献身,对她们来说,是把身体献给了自由和理想。

崔健和他的『一无所有』风靡一时,可以说开创了中国摇滚乐时代,开启了中国流行乐新纪元,我对其背后的原因比较感兴趣,好在作者已有分析,这也是全书最精华的部分。凡是追溯一个社会现象,或者一个历史事件的背后成因,无外乎外因和内因两个方面。这里的外因是整个全世界摇滚乐的发展浪潮,远的比如哥白尼,伽利略对人们观念改变的促进作用就不说了,自从19世纪末尼采发出“上帝死了”的呼声之后,西方逐渐进入了一个信仰崩塌的时代,传统价值观日薄西山。半个世纪后,摇滚乐作为一种流行文化艺术在美国发展起来,随后传到欧洲,走向世界,从50年代的“猫王”Elvis Presley开始,60年代以滚石和披头士乐队为代表,摇滚乐中的反叛情绪达到高峰,同时,Bob Dylan在歌曲中表达反战,以此将摇滚乐与政治挂上钩。从70年代开始,摇滚乐开始出现分支,首先是在精神上背离摇滚宗旨,只注重娱乐和享乐的迪斯科开始流行,接着是充满反对摇滚商品化意味的朋克乐也是欣欣向荣,这之后的80年代,朋克乐进一步以重金属的形式得到再次爆发。到了80年代末,摇滚乐已经成了西方人表达情绪和观点的重要媒介,开始作为一种社会文化力量对社会发展产生直接影响,以Pink Floyd纪念柏林墙倒塌的『The Wall』群星演唱会为典型代表。而这个时候的中国呢?中国的文艺在五四运动之后曾出现过短暂的繁荣,30年代中国的流行乐以时代曲的形式在上海生根发芽,但此时人们的观念还是被束缚和压抑,两千年的儒家文化,中庸之道,周礼之术仍旧根深蒂固。建国以后的反右运动,“大跃进”,以及十年文革,则是彻底荒芜了人们的精神世界,等到十一届三中全会解放思想之后,物质生活开始改善,人们发现到了自己心中压抑的情绪,这个时候,崔健带着摇滚乐,吼着『一无所有』适时地出现了。

如果以马克思主义学者历史唯物主义的眼光看待崔健现象的话,可以发现其中巧妙的必然性和偶然性,必然性在于时机已经成熟,中国摇滚乐的时代已经来临,无非是等待崔健、李健或是张健来打开这扇门;偶然性在于崔健的成名,『一无所有』歌曲本身只是一首非常普通的情歌,崔健坦言自己创作的出发点也没有大家想象的那么深刻和宏大,并且,他本人也表示不想将音乐与政治扯上关系,但是即便如此,『一无所有』还是被演绎为了一首时代的悲歌,并且成为了中国摇滚乐的开山之作。

崔健赋予了歌曲力量,带着深深的时代印记,来感受下吧:

    那天是你用一块红布
    蒙住我双眼也蒙住了天
    你问我看见了什么
    我说我看见了幸福
    这个感觉真让我舒服
    它让我忘掉我没地儿住
    ……

--EOF--

RabbitMQ源码分析:Per-Connection流控机制

生产者生产消息速度过快,使得RabbitMQ进程来不及处理,容易造成进程信箱过大,或者使消息进入不同层次的物理队列,引起过多的处理开销。因此,RabbitMQ实现了一种流控机制来避免上述问题。

RabbitMQ流控机制的核心是一个称为{InitialCredit, MoreCreditAfter}的元组,默认情况下值为{200, 50}。假如消息发送者进程A要给接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住,对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。

每个流控进程的进程字典中都维护着4类键:{credit_from, From}, {credit_to, To}, credit_blocked和credit_deferred,这些键值通过UPDATE宏来更新:

1
2
3
4
5
6
7
8
-define(UPDATE(Key, Default, Var, Expr),
        begin
            case get(Key) of
                undefined -> Var = Default;
                Var       -> ok
            end,
            put(Key, Expr)
        end).

其中,Key为键名,Default为初始值,Var为当前值,Expr为更新当前值的表达式。

{credit_from, From}的值表示还能向消息接收进程From发送多少条消息,当前进程会向多少个进程发送消息,进程字典中就有多少个{credit_from, From}键;{credit_to, To}的值表示当前进程再接收多少条消息,就要向消息发送进程增加Credit数量;credit_blocked的值表示当前进程被哪些进程block了,也就是禁止当前进程向其发送消息的进程列表,比如A向B发送消息,A的进程字典中{credit_from, B}值为0,那么,A的credit_blocked值为[B];credit_deferred缓存着消息接收进程向消息发送进程增加Credit的消息列表,它在当前进程被消息接收进程block时使用,当进程被unblock后(credit_blocked值为空),credit_deferred列表中的消息会依次发送。

RabbitMQ的credit_flow模块封装着流控相关的函数。

credit_flow:send/1和credit_flow:send/2函数在消息发送前调用,每发一条消息,更新{credit_from, From},使对应的Credit减1,当Credit数量为0时,调用block/1函数。

1
2
3
4
5
6
7
8
send(From) -> send(From, ?DEFAULT_CREDIT).
 
send(From, {InitialCredit, _MoreCreditAfter}) ->
    ?UPDATE({credit_from, From}, InitialCredit, C,
            if C == 1 -> block(From),
                         0;
               true   -> C - 1
            end).

credit_flow:block/1函数会更新credit_blocked值,把From对应的进程加到当前block进程列表中,表示当前进程不能再向block进程列表中的进程发送消息。

1
block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).

credit_flow:ack/2函数用于更新{credit_to, To}的值,每收到一条消息,消息接收进程ack一次,将{credit_to, To}值减1,当值减到1时,调用grant/2函数,给予消息发送进程MoreCreditAfter个Credit,使其可以继续发送消息。

1
2
3
4
5
6
7
8
ack(To) -> ack(To, ?DEFAULT_CREDIT).
 
ack(To, {_InitialCredit, MoreCreditAfter}) ->
    ?UPDATE({credit_to, To}, MoreCreditAfter, C,
            if C == 1 -> grant(To, MoreCreditAfter),
                         MoreCreditAfter;
               true   -> C - 1
            end).

credit_flow:grant/2函数可以给予消息发送进程指定数量的Credit,它会向目标进程发送一条{bump_credit, {self(), Quantity}}的普通消息,根据当前进程是否被block,grant/2函数会决定将bump_credit消息立即发送还是放入credit_deferred进程字典中缓存起来,等待时机合适再发送。

1
2
3
4
5
6
grant(To, Quantity) ->
    Msg = {bump_credit, {self(), Quantity}},
    case blocked() of
        false -> To ! Msg;
        true  -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
    end.

credit_flow:blocked/0函数用于判断当前进程是否被block,其判断依据是该进程进程字典中credit_blocked值是否为空,空表示未block,不为空表示其被某些进程block了。

1
2
3
4
5
blocked() -> case get(credit_blocked) of
                 undefined -> false;
                 []        -> false;
                 _         -> true
             end.

credit_flow:grant/2函数会向消息发送进程发送一条bump_credit消息,那么消息发送进程收到这条消息后,会调用credit_flow:handle_bump_msg/1函数进行处理,更新相应的{credit_from, From}值,增加Credit数量。

1
2
3
4
5
6
handle_bump_msg({From, MoreCredit}) ->
    ?UPDATE({credit_from, From}, 0, C,
            if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
                                                    C + MoreCredit;
               true                              -> C + MoreCredit
            end).

如果当前的Credit值小于0,并且加上收到的MoreCredit个Credit后值大于0,这种情况表示当前进程可以继续向From进程发送消息,进行一次unblock/1操作。

credit_flow:unblock/1函数用于将消息接收进程从当前进程的credit_blocked列表中删除。

1
2
3
4
5
6
7
8
9
unblock(From) ->
    ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
    case blocked() of
        false -> case erase(credit_deferred) of
                     undefined -> ok;
                     Credits   -> [To ! Msg || {To, Msg} <- Credits]
                 end;
        true  -> ok
    end.

如果unblock/1操作后当前进程的block列表为空,那么需要取出由credit_deferred缓存的消息列表,依次发送。

另外,为了避免消息接收进程或者消息发送进程挂掉引起其他进程永远被block,当检测到有进程挂掉后需要调用peer_down/1进行相关记录清理。

1
2
3
4
5
peer_down(Peer) ->
    unblock(Peer),
    erase({credit_from, Peer}),
    erase({credit_to, Peer}),
    ok.

RabbitMQ中,一条消息从接收到入队,在进程间的传递链路为rabbit_reader -> rabbit_channel -> rabbit_amqqueue_process -> rabbit_msg_store。进程之间构成一个有向无环图:
messages

这里以rabbit_reader -> rabbit_channel之间的消息流控为例说明。

rabbit_reader:process_frame/3函数解析帧,当该帧为一个包含内容的帧时,调用rabbit_channel:do_flow/3函数:

1
2
3
4
5
6
7
8
9
10
11
 
process_frame(Frame, Channel, State) ->
   ......
    case rabbit_command_assembler:process(Frame, AState) of
       ......
        {ok, Method, Content, NewAState} ->
            rabbit_channel:do_flow(ChPid, Method, Content),
            put(ChKey, {ChPid, NewAState}),
            post_process_frame(Frame, ChPid, control_throttle(State));
        ......
    end.

rabbit_channel:do_flow/3函数做的事情是调用credit_flow:send/1更新自己的{credit_from, Pid}值,然后将包含消息的帧发送给rabbit_channel进程(由参数Pid指定):

1
2
3
do_flow(Pid, Method, Content) ->
    credit_flow:send(Pid),
    gen_server2:cast(Pid, {method, Method, Content, flow}).

注意do_flow/3虽然属于rabbit_channel模块,但是它是在rabbit_reader:process_frame/3函数中被调用的,credit_flow:send/1修改的是rabbit_reader进程的进程字典,只有执行完gen_server2:cast/2函数后,消息才进入到rabbit_channel进程空间,rabbit_channel模块对此异步消息的处理函数为:

1
2
3
4
5
6
7
handle_cast({method, Method, Content, Flow},
            State = #ch{reader_pid = Reader}) ->
    case Flow of
        flow   -> credit_flow:ack(Reader);
        noflow -> ok
    end,
    ......

rabbit_channel进程每收到一条消息,就调用一次credit_flow:ack/1函数,当进程字典中的{credit_to, Reader}值为0时,向rabbit_reader进程发送bump_credit消息,此消息由rabbit_reader进程的handle_other({bump_credit, Msg}, State)函数进行处理。

1
2
3
handle_other({bump_credit, Msg}, State) ->
    credit_flow:handle_bump_msg(Msg),
    control_throttle(State);

流控机制的基本原理就是这样,但是讲到现在,都还没有讲到RabbitMQ是怎么阻塞客户端的数据请求,现在以一个具体场景为例说明:

在上面的消息传递链中,假设rabbit_msg_store进程(Pid为StorePid)来不及处理rabbit_amqqueue_process进程(Pid为AMQPid)过来的消息,根据流控机制,慢慢地rabbit_amqqueue_process进程字典中的{credit_from, StorePid}值变为0,credit_blocked值变为[StorePid],此时如果有消息从rabbit_channel发送到rabbit_amqqueue_process进程,rabbit_amqqueue_process进程还是会将消息源源不断地发送到rabbit_msg_store进程。咋看之下,流控并没有起作用,实际上,此时rabbit_channel进程字典中的{credit_from, AMQPid}值也会慢慢减少,并且,rabbit_channel进程不会再收到来自rabbit_amqqueue_process进程的bump_credit消息,因为rabbit_amqqueue_process的credit_blocked列表不为空,它会将待发的{bump_credit, {self(), Quantity}}消息缓存在credit_deferred列表中!逐渐,rabbit_channel进程字典中的credit_blocked列表变为[AMQPid],rabbit_channel进程无法向其上游的rabbit_reader进程发送bump_credit消息(全都缓存在自己进程字典的credit_deferred里)。到最后,最上游的rabbit_reader进程字典中credit_blocked列表不为空,在此进程中调用credit_flow:blocked/0返回true。回到rabbit_reader进程,每处理完一条包含内容的帧时(rabbit_channel:do_flow/3),都会调用control_throttle/1函数,如果credit_flow:blocked/0返回true,会将当前的连接状态置为blocking:{running, true} -> State#v1{connection_state = blocking},随即,连接状态会转变为blocked。这时,当rabbit_reader继续接受客户端数据时,就会进入recvloop/2函数子句:

1
2
recvloop(Deb, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, State);

最终进程阻塞在mainloop/2的rabbit_net:recv/1函数上。rabbit_net:recv/1函数会阻塞的原因是RabbitMQ采用了gen_tcp的半阻塞模型,也就是说每次接受一个tcp消息之后,必须显式调用inet:setopts(Sock, [{active, once}])来激活一下,否则,进程会一直阻塞在receive语句上。

以上就是假设rabbit_msg_store处理速度跟不上,最终导致rabbit_reader进程停止接收客户端数据的流控机制作用的过程。根据实现原理,消息链进程所构成的有向无环图中,任何一条边触发流控机制,最终都会导致这条连接停止接收客户端数据。

注:RabbitMQ源码为3.1.5。

Reference:
[1]. Alvaro Videla - RABBITMQ INTERNALS - CREDIT FLOW FOR ERLANG PROCESSES.
[2]. RabbitMQ流量控制机制分析.docx

--EOF--