标签归档:Erlang

Erlang进程GC引发RabbitMQ Crash

生产环境下曾数次出现过RabbitMQ异常崩溃的场景,好在镜像队列的部署方式对服务未产生影响。

我们的RabbitMQ部署环境是:

KVM: 2 VCPU, x86_64, 4G RAM, swap disabled。
OS: Debian 7.0, Linux Kernel 3.2.0-4-amd64
Erlang: R15B01 (erts-5.9.1) [64-bit] [smp:2:2]
RabbitMQ: 3.1.5, {vm_memory_high_watermark,0.4}, {vm_memory_limit,1663611699}

分析最近一次崩溃产生的erl_crash.dump文件:

=erl_crash_dump:0.1
Tue Jun 16 00:46:49 2015
Slogan: eheap_alloc: Cannot allocate 1824525600 bytes of memory (of type "old_heap").
System version: Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:2:2] [async-threads:30] [kernel-poll:true]
Compiled: Sun Jan 27 18:19:34 2013
Taints:
Atoms: 22764
=memory
total: 1980253120
processes: 1563398383
processes_used: 1563397555
system: 416854737
atom: 703377
atom_used: 674881
binary: 385608216
code: 18475052
ets: 7643488
......

发现Crash原因是:Cannot allocate 1824525600 bytes of memory (of type "old_heap")。从数据来看,Erlang虚拟机已占用约1.98G内存(其中分配给Erlang进程的占1.56G),此时仍向操作系统申请1.82G,因为操作系统本身以及其他服务也占用一些内存,当前系统已经分不出足够的内存了,所以Erlang虚拟机崩溃。

此处有两个不符合预期的数据:
1. vm_memory_limit控制在1.67G左右,为什么崩溃时显示占用了1.98G?
2. 为什么Erlang虚拟机会额外再申请1.82G内存?

经过一些排查和总结,发现几次崩溃都是出现在队列中消息堆积较多触发了流控或者有大量unack消息requeue操作的时候,基本把原因确定为Erlang对进程进行Major GC时系统内存不足上。

在之前的『RabbitMQ与Erlang』一文中,曾简单介绍过Erlang的软实时特性,其中按进程GC是实现软实时的一个重要手段:

RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量。

这也是RabbitMQ将内存流控的安全阈值设置为0.4的原因,即使double,也是0.8,还是安全的。0.4的意思是,当broker的内存使用量大于40%时,开始进行生产者流控,但是该参数并不承诺broker的内存使用率不大于40%。官方文档也强调过这一点:

The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM).

实际上通过RabbitMQ运行日志很容易证实,broker的实际内存使用量可以远远大于vm_memory_high_watermark设定值,比如:

$ less /var/log/rabbitmq/rabbit@10.xx.xx.xx.log
=INFO REPORT==== 16-Jun-2015::00:46:25 ===
vm_memory_high_watermark set. Memory used:2497352280 allowed:1663611699
=WARNING REPORT==== 16-Jun-2015::00:46:25 ===
memory resource limit alarm set on node 'rabbit@10.xx.xx.xx'.

内存流控阈值掐在1.6G,但是实际使用了近2.5G!再加上操作系统本身和其他服务吃掉的内存,轻松超过3G。如果此时触发Erlang进程Major GC,需要占用双倍的当前堆内存大小,那么报本文开头的old_heap堆内存分配错误也就不足为奇了。

知道了问题所在,如何解决这个问题呢?

遗憾的是,我目前也没有确切的解决方法(如有人知道,烦请告知)。但是从问题的形成原因来看,至少从以下几个方面可以显著降低问题出现概率。

1. RabbitMQ独立部署,不与其他Server共享内存资源。
2. 进一步降低vm_memory_high_watermark值,比如设置成0.3,但是这种方式会造成内存资源利用率太低。
3. 开启swap,问题在于容易造成性能和吞吐量恶化。
4. 升级RabbitMQ至新版(3.4+)。我个人觉得这个问题的根本原因在于RabbitMQ对内存的管理上,特别是早期版本,Matthew(RabbitMQ作者之一)也曾谈到过这个问题。尽管设计之初就考虑了各种优化,使得队列进程私有堆尽量小,比如当触发了某些条件,会page out到磁盘,或以二进制数据的方式存储在共享数据区,但是即便如此,在某些特定时刻,队列进程私有堆仍会消耗大量内存,这点从Management Plugin或者Remote shell里都可以看出来,而每次的“Cannot allocate old_heap”问题恰恰也总是出现在这些时刻。RabbitMQ 3.4版本发布后,官方发布了一篇新版内存管理方面的博客,从介绍上来看,要准确计算和控制当前系统使用的内存量确实非常困难。所以作者的感慨有理:

Of course, this still doesn't give a perfect count of how much memory is in use by a queue; such a thing is probably impossible in a dynamic system. But it gets us much closer.

--EOF--

RabbitMQ与Erlang

Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用),那么消息传递有没有开销?有,但是基本可以忽略,消息传递在单机上的本质就是内存复制,要知道DDR3 SDRAM的内存带宽约为16GB/s。

RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器,那么Erlang从语言层面带给RabbitMQ有哪些直接好处?

1. 高并发。
Erlang进程是完全由Erlang虚拟机进行调度和生命周期管理的一种数据结构,它与操作系统进程及线程完全没关系,也不存在数值上的什么对应关系。实际上,一个Erlang虚拟机对应一个操作系统进程,一个Erlang进程调度器对应一个操作系统线程,一般来说,有多少个CPU核就有多少个调度器。Erlang进程都非常轻量级,初始状态只包括几百个字节的PCB和233个字大小的私有堆栈,并且创建和销毁也非常轻量,都在微秒数量级。这些特征使得一个Erlang虚拟机里允许同时存在成千上万个进程,这些进程可以被公平地调度到各个CPU核上执行,因此可以在多核的场景下充分利用资源。

在RabbitMQ的进程模型里,AMQP概念里的channel和queue都被设计成了Erlang进程,从接受客户端连接请求开始,到消息最终持久化到磁盘,消息经过的进程链如下:
RabbitMQ Processes
上图中,tcp_acceptor进程用于接收客户端连接,然后初始化出rabbit_reader,rabbit_writer和rabbit_channel进程。rabbit_reader进程用于接收客户端数据,解析AMQP帧。rabbit_writer进程用于向客户端返回数据。rabbit_channel进程解析AMQP方法,然后进行消息路由等操作,是RabbitMQ的核心进程。rabbit_amqqueue_process是队列进程,rabbit_msg_store是负责进行消息持久化的进程,这两种类型进程都是RabbitMQ启动或者创建队列时创建的。从数量角度看,整个系统中存在,一个tcp_acceptor进程,一个rabbit_msg_store进程,多少个队列就有多少个rabbit_amqqueue_process进程,每条客户端连接对应一个rabbit_reader和rabbit_writer进程,至多对应65535个rabbit_channel进程。结合进程的数量,RabbitMQ的进程模型也可以描述如下图:
messages

RabbitMQ这种细粒度的进程模型正是得益于Erlang的高并发性。

2. 软实时。
Erlang的软实时特性可以从两方面看出。

首先是Erlang也是一门GC语言,但是Erlang的垃圾回收是以Erlang进程为粒度的。因为Erlang的消息传递和进程私有堆机制,使得按进程进行GC很容易实现,不必对一块内存或一个对象进行额外的引用计数。虽然对于单个进程来说,GC期间是“Stop The World”,但是前面也说过一个Erlang应用允许同时存在成千上万个进程,因此一个进程STW对于系统整体性能影响几乎微乎其微。另外,当进程需要销毁时,这个进程占用的所有内存可以直接回收,因为这块内存中的数据都是这个进程私有的。

另一方面,Erlang虚拟机对进程的调度采用的是抢占式策略。每个进程的运行周期都会分配到一定数量的reduction,当进程在进行外部模块函数调用,BIF调用,甚至算术运算都会减少reduction数量,当reduction数量减为0时,此进程交出CPU使用权,被其他进程抢占。相比于一些基于时间分片的软实时系统调度算法,reduction机制更加关注的是进程在执行期间能做多少事情,而不是时间上的绝对平均。

RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量,因此设置内存流控阈值vm_memory_high_watermark时需要注意,默认的0.4就是一个保险的值,当超过0.5时,就有可能造成系统内存被瞬间吃完,RabbitMQ程序被系统OOM Killer杀掉。

3. 分布式。
Erlang可以说原生支持分布式,先看一段程序:

1
2
3
4
5
6
7
8
9
10
run(Node) -> 
    Pid = spawn(Node, fun ping/0), 
    Pid ! self(), 
    receive 
        ping -> ok 
    end. 
ping() ->
    receive
        From -> From ! ping 
    end.

上述程序演示的是一个分布式并发程序,运用了spawn/2,!,receive...end这三个并发原语。spawn/2用于创建进程,!用于异步发送消息,receive...end用于接收消息,注意spawn/2的第一个参数Node,它表示节点名称,这意味着对于应用来说,Pid ! Msg 就是将Msg消息发送到某一个Erlang进程,而无论这个进程是本地进程还是存在于远程的某个节点上,Erlang虚拟机会帮应用搞定一切底层通信机制。也就是说,物理节点分布式对上层Erlang应用来说是透明的。

这为实现RabbitMQ的集群和HA policy机制提供了极大的便利,主节点只要维护哪个是Pid(master),哪几个是slave_pids(slave)信息就行,根据不同的类型(publish和非publish),对队列操作进行replication。

4. 健壮性。
在Erlang的设计哲学里,有一个重要的概念就是“let it crash”。Erlang不提倡防御式编程,它认为程序既然遇到错误就应该让它崩溃,对于一个健壮的系统来说,崩溃不要紧,关键要重新起来。Erlang提供一种supervisor的行为模式,用于构建一棵健壮的进程监督树。监督者进程本身不包含业务逻辑,它只负责监控子进程,当子进程退出时,监督者进程可以根据一些策略将子进程重启。据说爱立信用Erlang写的AXD301交换机系统,可靠性为9个9,这意味着运行20年差不多有1秒的不可用时间,如此高的可靠性就是supervisor行为模式及其背后任其崩溃思想的极致体现(当然也离不开Erlang另外一个法宝代码热更新)。

在RabbitMQ里,supervisor行为模式运用得非常多,基本上每个worker进程都有相应的监督者进程,层层监督。比如下图所示的网络层进程监督树模型(已做过简化):
RabbitMQ Supervisor Tree

椭圆表示进程,矩形表示重启策略,one_for_all表示一个进程挂了其监督者进程的其他子进程也会被重启,比如一个rabbit_reader进程挂了,那么rabbit_channel_sup3进程也会重启,然后所有rabbit_channel根据AMQP协议协商后重新创建。simple_one_for_one则表示一种需要时再初始化的子进程重启策略,适用于一些动态添加子进程的场景,比如图中的rabbit_channel进程和tcp_acceptor进程。

--EOF--

Erlang运行时之Message Passing

本文试图分析Erlang消息传递机制,总结优缺点,大部分资料来源于论文『Characterizing the Scalability of Erlang VM on Many-core Processors』和stackoverflow,并且带有自己的理解,不当之处需多包涵。

Erlang进程间通信采用的策略是消息传递(Message Passing)。这里的传递是指传递消息副本,而不是传递消息指针或者引用。小消息的复制成本可以忽略,现在的DDR3 SDRAM内存带宽约为2000MT/s,64位数据总线下有近16GB/s的数据传输速度。从VM的角度看,消息传递的本质就是调度器将消息从发送者进程的堆中复制到接收者进程的堆中。基于复制的消息传递有一个好处,可以非常方便实现以进程为单位的GC,因为GC范围是进程私有堆,不必考虑堆中的数据是否被其他进程引用。但是并不是所有消息都是通过复制实现消息传递,大于64K的二进制数据是通过共享内存实现的,前文已有提及。消息太大时,进程GC频繁触发,并且大消息复制成本也无法忽略,这两点带来的性能损耗可能比共享对象引用计数造成的性能损失还要大,因此,Erlang权衡之下对二进制数据采用了不同的消息传递策略。

在多核时代,不同的Erlang进程被不同的调度器执行。当接收进程正被其他调度器执行,而它的堆空间不够或者正有其他消息往堆中发送消息时,发送进程会分配一块临时的Heap Fragment(参考『Erlang运行时之进程』中的图1),用于存储待发送消息的副本,这块Heap Fragment数据会在GC期间被合入接收进程的堆空间。发送进程把消息复制完毕后,生成一个该消息的元数据结构(data management structure),该结构体包含了指向消息副本(处在接收进程堆或者Heap Fragment里)的指针,最后将元数据结构体放入接受进程的消息队列(mailbox)里。如果此时接收进程处于suspended状态,那么它会被激活,处理该消息;如果本来就是运行状态,那么消息会被等待取走进行匹配。

接收进程的mailbox在逻辑上是一个消息队列,但在物理上是通过两个物理队列实现,分别为public queue和private queue。前者对应PCB中的ErlMessageInQueue,只有在支持SMP的erts才有,用于接收其他进程发送来的消息,发送进程操作此队列时需要加锁互斥访问,避免不同进程操作同一块内存;后者对应PCB中的ErlMessageQueue,是接收进程取消息的队列,这样接收进程就可以不必关心锁带来的额外开销了,否则接收进程必须通过获取锁才能安全地读取完整的消息数据。当private queue为空时,public queue的消息会被追加到private queue里。由此可见,mailbox(含public queue和private queue)本身处在PCB中,而不是在堆中。ErlMessageInQueue和ErlMessageQueue的定义(erlang/erts/emulator/beam/erl_message.h)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct {
    ErlMessage* first;
    ErlMessage** last;  /* point to the last next pointer */
    ErlMessage** save;
    int len;            /* queue length */
} ErlMessageQueue;
 
#ifdef ERTS_SMP
typedef struct {
    ErlMessage* first;
    ErlMessage** last;  /* point to the last next pointer */
    int len;            /* queue length */
} ErlMessageInQueue;
#endif

其中,ErlMessageQueue结构体有个save属性,这个save链表就是进程selective recieve时匹配失败后将消息暂存的save queue。

在支持SMP的Erlang虚拟机里,消息传递机制会引入一些额外消耗,这些消耗会在一定程度上影响到CPU核数在erts上的可扩展性:
1. 上文已有提及,分属不同调度器的发送进程向同一个接收进程发送消息时,元数据结构入public queue需要锁去同步。
2. 发送进程要从OS进程的堆空间分配内存供存储消息(或者Heap Fraigment)和元数据使用,这个分配内存的操作需要加锁。同理,内存释放操作也同样需要加锁。
3. 当大量进程并行执行时,他们的消息收发顺序相对于单核模式下是乱序的,虽然不影响最终接收进程的消息匹配,但是会带来损耗,原因是进程的消息接收采用selective receive机制,有些消息会被匹配多次。

Erlang的消息传递机制基于复制。我觉得,通过传递消息副本来避免锁不是Erlang高效的直接原因,因为有变量不变性(immutable variable)保障,即使传递引用也可以避免不必要的锁开销,况且,在支持SMP的运行时环境下,锁作为同步手段也经常使用。但是基于复制带来的好处是方便实现以进程为粒度的GC机制,从而在间接上提升了Erlang的并发性。

--EOF--

Erlang运行时之进程

本文试图从进程角度解释Erlang之所以高效的原因,大部分资料来源于论文『Characterizing the Scalability of Erlang VM on Many-core Processors』,并且带有自己的理解,不当之处请多包涵。

Erlang作为一门面向并发的语言(Concurrent Oriented Programming, COP),进程扮演着重要的作用,可以说Erlang就是一门面向进程的语言。归根到底,Erlang的核心概念无非就是进程、模式匹配、消息传递三大法宝。

目前主流的Erlang虚拟机是BEAM(Bogdan/Bjrn’s Erlang Abstract Machine),早期的JAM, old BEAM现都已经废弃不用。Erlang虚拟机是运行在操作系统中的一个多线程进程。Linux下,用POSIX线程库(pthread)实现,多线程共享进程(VM)的内存空间。一般来说,Erlang虚拟机会为每个CPU核分配两个线程,一个负责IO,一个作为调度器负责调度Erlang进程。

Erlang进程是虚拟机级别的进程,它非常轻量,初始化时只有2K左右,Erlang官方文档有给出测试初始进程占用内存大小的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Eshell V6.1  (abort with ^G)
1> Fun = fun() -> receive after infinity -> ok end end.
#Fun<erl_eval.20.90072148>
2> Pid = spawn(Fun).
<0.35.0>
3> {_,Bytes} = process_info(Pid, memory).
{memory,2680}
4> Bytes div erlang:system_info(wordsize).
335
5> erlang:process_info(Pid).
 ......
 {total_heap_size,233},
 {heap_size,233},
 {stack_size,9},
 ......

可以看到,一个进程包含堆栈在内只需2680B内存,其中堆(含栈)大小为233个字,64位系统下一个字等于8个字节,堆栈占用1864B。实际上,如果只计算PCB,大约只占300B,相比Linux PCB的1K也轻量不小。另外,Joe Armstrong在『Progamming Erlang』中也有过示范,物理内存充足的情况下,spawn一个进程只需花费微秒数量级的时间。因此,Erlang系统中允许同时存在成千上万的进程。

Erlang Process
图1

图1是Erlang进程的内部组成,每个进程都由独立的进程控制块(PCB, process control block)、栈和私有堆三部分组成。PCB包含的信息有进程ID、堆栈起始地址、mailbox、程序寄存器(PC)、参数寄存器等等,完整定义可以参考Erlang运行时(erts)源代码头文件erlang/erts/emulator/beam/erl_process.h中的process结构体,以下是几个主要字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct process {
    Eterm* htop;    /* Heap top */
    Eterm* stop;    /* Stack top */
    Eterm* heap;    /* Heap start */
    Eterm* hend;    /* Heap end */
    Uint heap_sz;   /* Size of heap in words */
    Uint min_heap_size; /* Minimum size of heap (in words). */
    Eterm* i;       /* Program counter for threaded code. */
    Uint32 status;  /* process STATE */
    Eterm id;       /* The pid of this process */
    Uint reds;      /* No of reductions for this process  */
    Process *next;  /* Pointer to next process in run queue */
    Process *prev;  /* Pointer to prev process in run queue */
    ErlMessageQueue msg;/* Message queue */
#ifdef ERTS_SMP
    ErlMessageInQueue msg_inq;
#endif
    ......
};

进程堆和栈共同占用一块连续的内存空间,堆空间由低地址向高地址增长,栈空间由高地址向低地址增长,当堆顶和栈顶一样时,可以判定堆栈空间已满,需要通过垃圾回收空间和或者增长空间。在Erlang进程看来,这块堆栈内存是独占的,进程间彼此隔离;在操作系统进程看来,所有Erlang进程的堆栈空间都在自己的堆空间里。Erlang进程里堆空间和栈空间存放数据类型有所区分,前者主要是一些复合数据,比如元组、列表和大数等,后者主要存放一些简单数据类型以及堆中复合数据的引用。

图2是以列表和元组为例展示了Erlang进程中堆栈的内存布局:

Heap Layout
图2

Erlang是动态类型语言,变量类型需要到运行时才能确定,因此,堆栈中每个数据都有一个Type标签表示其类型。元组在堆中是以Array的形式存储,有字段表示元组大小,并且在栈中有一个指针(引用)指向这块堆空间,因为是连续空间,要取出元组中的数据只需O(1)的复杂度计算内存偏移量即可。对于列表来说,列表元素在堆中是以链表形式存在的,由栈中的一个指针指向列表的第一个元素。相邻列表元素在内存中并不连续,也没有字段表示列表大小,因此要获取列表大小只能通过遍历,这个操作是个O(N)的时间复杂度。对于lists:append(ListB, ListA)这个操作,Erlang做的事情是先复制ListB,遍历ListB,找到列表尾部元素,将下一元素指针从NIL改为ListA第一个元素地址。由此可知,要提高性能,最好是将较长的列表追加到较短的列表上,以减少遍历时间。另外,也不要试图在列表尾部追加元素,原因同上,之前的一篇『Erlang列表操作性能分析』对此已做过分析。如果将ListC当做消息内容发送给其他进程,则整个ListC列表都会复制一份,即使往同一个节点发送多次,复制也会进行多次,这往往会导致消息接收进程占用的内存空间比发送进程大,因为对接收进程来说,每次接到的ListC会被当成不同的数据。

从图2还可以看出,ListA和ListC共享了部分数据,也就说,在一个Erlang进程内部,是存在内存共享的情况的。在Erlang里,变量拥有不变性,一次赋值(模式匹配)成功,它就不会再变,因此,ListA和ListC可以永远安全地共享这些数据。当然,Erlang中的内存共享在其他场景下也会出现,在图1中所示的有两块内存共享区域,一块是二进制数据共享区,用于存储大于64K的二进制数据;另一块是存储ETS表用的,ETS可以供每个进程访问,相比真正的共享内存有一些不同,它基于消息复制以记录为单位进行存取;相比数据库,它弱了很多,不支持事务机制。

当进程堆栈空间满时,会触发调度器对进程进行GC,如果GC结束堆栈空间仍然不足,则会分配新空间。Erlang的GC是以进程为单位,对某个进程GC不会影响其他进程的执行,虽然对单个进程来说,存在stop the world的现象,但是从全局来看,其他进程不会受影响,这个特性使得Erlang能够应付大规模高并发的业务场景,基于Elrang的业务系统可以达到软实时的级别。另外,一旦进程生命周期结束,GC可以非常方便地直接回收这个进程占用的所有内存。

综上,从进程的角度的来看,使得Erlang高效主要是以下方面:
1. 进程本身轻量。所以线程池的概念在Erlang语言层面根本不存在。
2. 变量不变性避免了很多无谓的数据复制。如List操作,直接通过修改指针实现append操作。
3. 以进程为单位进行GC。

References:
[1] Characterizing the Scalability of Erlang VM on Many-core Processors
[2] Erlang does have shared memory
[3] Erlang vs Java memory architecture
[4] Erlang - Programming the Parallel World

--EOF--

gen_server behaviour exercise

In these exercises, we’ll make a server in the module job_centre, which uses gen_server to implement a job management service. The job center keeps a queue of jobs that have to be done. The jobs are numbered. Anybody can add jobs to the queue. Workers can request jobs from the queue and tell the job center that a job has been performed. Jobs are represented by funs. To do a job F, a worker must evaluate the function F().
1. Implement the basic job center functionality, with the following interface:
job_centre:start_link() -> true.
    Start the job center server.
job_centre:add_job(F) -> JobNumber.
    Add a job F to the job queue. Return an integer job number.
job_centre:work_wanted() -> {JobNumber,F} | no.
    Request work. When a worker wants a job, it calls job_centre:work_wanted(). If there are jobs in the queue, a tuple {JobNumber, F} is returned. The worker performs the job by evaluating F(). If there are no jobs in the queue, no is returned. Make sure the same job cannot be allocated to more than one worker at a time. Make sure that the system is fair, meaning that jobs are handed out in the order they were requested.
job_centre:job_done(JobNumber)
    Signal that a job has been done. When a worker has completed a job, it must call job_centre:job_done(JobNumber).
2. Add a statistics call called job_centre:statistics() that reports the status of the jobs in the queue and of jobs that are in progress and that have been done.
3. Add code to monitor the workers. If a worker dies, make sure the jobs it was doing are returned to the pool of jobs waiting to be done.
4. Check for lazy workers; these are workers that accept jobs but don’t deliver on time. Change the work wanted function to return {JobNumber, JobTime, F} where JobTime is a time in seconds that the worker has to complete the job by. At time JobTime - 1, the server should send a hurry_up message to the worker if it has not finished the job. And at time JobTime + 1, it should kill the worker process with an exit(Pid, youre_fired) call.

Ans:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
-module(job_center).
-behaviour(gen_server).
 
-export([start_link/0, add_job/2, work_wanted/0, job_done/1, statistics/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TIME, 1000).
-record(state, {undo, doing, done, index}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
  gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
add_job(F, Time) ->
  gen_server:call(?MODULE, {add_job, F, Time}).
work_wanted() ->
  gen_server:call(?MODULE, get_job).
job_done(JobNumber) ->
  gen_server:cast(?MODULE, {job_done, JobNumber}).
statistics() ->
  gen_server:call(?MODULE, statistics).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
  process_flag(trap_exit, true),
  timer:send_interval(?TIME, self(), check_worker),
  {ok, #state{undo = queue:new(), doing = [], done = [], index = 1}}.
 
handle_call({add_job, F, Time}, _From, #state{undo = UndoPool, index = Idx} = State) ->
  State1 = State#state{undo = queue:in({Idx, Time, F}, UndoPool), index = Idx + 1},
  {reply, Idx, State1};
handle_call(get_job, {From, _}, #state{undo = UndoPool, doing = DoingPool} = State) ->
  {Reply, State1} = case queue:out(UndoPool) of
                      {{value, {Idx, Time, Fun}}, UndoPool2} ->
                        {{Idx, Time, Fun}, State#state{undo = UndoPool2, doing = [{Idx, Time, Fun, From, now_in_secs()} | DoingPool]}};
                      {empty, _} ->
                        {no, State}
                    end,
  erlang:monitor(process, From),
  {reply, Reply, State1};
handle_call(statistics, _From, #state{undo = UndoPool, doing = DoingPool, done = DonePool} = State) ->
  io:format("undo:~p~n", [UndoPool]),
  io:format("doing:~p~n", [DoingPool]),
  io:format("done:~p~n", [DonePool]),
  {reply, [{undo, queue:len(UndoPool)}, {doing, length(DoingPool)}, {done, length(DonePool)}], State};
handle_call(_Request, _From, State) ->
  {reply, ok, State}.
 
handle_cast({job_done, JobNumber}, #state{doing = DoingPool, done = DonePool} = State) ->
  Item = lists:keyfind(JobNumber, 1, DoingPool),
  DoingPool2 = lists:delete(Item, DoingPool),
  {Idx, Time, Fun, From, _} = Item,
  DonePool2 = [{Idx, Time, Fun, From} | DonePool],
  {noreply, State#state{doing = DoingPool2, done = DonePool2}};
handle_cast(_Request, State) ->
  {noreply, State}.
 
handle_info({'DOWN', _Ref, process, From, normal}, #state{undo = UndoPool, doing = DoingPool} = State) ->
  State1 = case lists:keyfind(From, 4, DoingPool) of
             Item when is_tuple(Item) ->
               {Idx, Time, Fun, _From, _} = Item,
               DoingPool2 = lists:delete(Item, DoingPool),
               State#state{undo = queue:in({Idx, Time, Fun}, UndoPool), doing = DoingPool2};
             false -> State
           end,
  {noreply, State1};
handle_info(check_worker, #state{undo = UndoPool, doing = DoingPool} = State) ->
  Now = now_in_secs(),
  F = fun(Doing) ->
    {Idx, Time, Fun, From, StartTime} = Doing,
    if
      Now =:= StartTime + Time - 1 -> From ! hurry_up;
      Now =:= StartTime + Time + 1 ->
        exit(From, youre_fired),
        State#state{undo = queue:in({Idx, Time, Fun}, UndoPool)};
      true -> ok
    end
  end,
  lists:foreach(F, DoingPool),
  {noreply, State};
handle_info(Info, State) ->
  io:format("extra info:~p~n", [Info]),
  {noreply, State}.
 
terminate(_Reason, _State) ->
  ok.
code_change(_OldVsn, State, _Extra) ->
  {ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
now_in_secs() ->
  {A, B, _} = os:timestamp(),
  A * 1000000 + B.

Test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
$ erl -sname aa
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:4:4] [async-threads:10]
 
Eshell V6.1  (abort with ^G)
1> c(job_center).
{ok,job_center}
2> F1 = fun() -> 1 end.
#Fun<erl_eval.20.90072148>
3> F2 = fun() -> 2 end.
#Fun<erl_eval.20.90072148>
4> job_center:add_job(F1, 10).
1
5> job_center:add_job(F2, 10).
2
6> job_center:add_job(F1, 20).
3
7> job_center:add_job(F2, 20).
4
8> job_center:statistics().
undo:{[{4,20,#Fun<erl_eval.20.90072148>},
       {3,20,#Fun<erl_eval.20.90072148>},
       {2,10,#Fun<erl_eval.20.90072148>}],
      [{1,10,#Fun<erl_eval.20.90072148>}]}
doing:[]
done:[]
[{undo,4},{doing,0},{done,0}]
9> job_center:work_wanted().
{1,10,#Fun<erl_eval.20.90072148>}
10> job_center:job_done(1).
ok
11> job_center:statistics().
undo:{[{4,20,#Fun<erl_eval.20.90072148>},{3,20,#Fun<erl_eval.20.90072148>}],
      [{2,10,#Fun<erl_eval.20.90072148>}]}
doing:[]
done:[{1,10,#Fun<erl_eval.20.90072148>,<0.83.0>}]
[{undo,3},{doing,0},{done,1}]
12> job_center:work_wanted().
{2,10,#Fun<erl_eval.20.90072148>}
13> receive V -> V end.
hurry_up
 
=ERROR REPORT==== 24-Aug-2014::14:37:03 ===
** Generic server job_center terminating
** Last message in was {'EXIT',<0.83.0>,youre_fired}
** When Server state == {state,{[{4,20,#Fun<erl_eval.20.90072148>}],
                                [{3,20,#Fun<erl_eval.20.90072148>}]},
                               [{2,10,#Fun<erl_eval.20.90072148>,<0.83.0>,
                                 1408862212}],
                               [{1,10,#Fun<erl_eval.20.90072148>,<0.83.0>}],
                               5}
** Reason for termination ==
** youre_fired
** exception error: youre_fired

Exercises from 『Programming Erlang (2Edtion)』.

--EOF--