分类目录归档:Erlang

RabbitMQ与Erlang

Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用),那么消息传递有没有开销?有,但是基本可以忽略,消息传递在单机上的本质就是内存复制,要知道DDR3 SDRAM的内存带宽约为16GB/s。

RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器,那么Erlang从语言层面带给RabbitMQ有哪些直接好处?

1. 高并发。
Erlang进程是完全由Erlang虚拟机进行调度和生命周期管理的一种数据结构,它与操作系统进程及线程完全没关系,也不存在数值上的什么对应关系。实际上,一个Erlang虚拟机对应一个操作系统进程,一个Erlang进程调度器对应一个操作系统线程,一般来说,有多少个CPU核就有多少个调度器。Erlang进程都非常轻量级,初始状态只包括几百个字节的PCB和233个字大小的私有堆栈,并且创建和销毁也非常轻量,都在微秒数量级。这些特征使得一个Erlang虚拟机里允许同时存在成千上万个进程,这些进程可以被公平地调度到各个CPU核上执行,因此可以在多核的场景下充分利用资源。

在RabbitMQ的进程模型里,AMQP概念里的channel和queue都被设计成了Erlang进程,从接受客户端连接请求开始,到消息最终持久化到磁盘,消息经过的进程链如下:
RabbitMQ Processes
上图中,tcp_acceptor进程用于接收客户端连接,然后初始化出rabbit_reader,rabbit_writer和rabbit_channel进程。rabbit_reader进程用于接收客户端数据,解析AMQP帧。rabbit_writer进程用于向客户端返回数据。rabbit_channel进程解析AMQP方法,然后进行消息路由等操作,是RabbitMQ的核心进程。rabbit_amqqueue_process是队列进程,rabbit_msg_store是负责进行消息持久化的进程,这两种类型进程都是RabbitMQ启动或者创建队列时创建的。从数量角度看,整个系统中存在,一个tcp_acceptor进程,一个rabbit_msg_store进程,多少个队列就有多少个rabbit_amqqueue_process进程,每条客户端连接对应一个rabbit_reader和rabbit_writer进程,至多对应65535个rabbit_channel进程。结合进程的数量,RabbitMQ的进程模型也可以描述如下图:
messages

RabbitMQ这种细粒度的进程模型正是得益于Erlang的高并发性。

2. 软实时。
Erlang的软实时特性可以从两方面看出。

首先是Erlang也是一门GC语言,但是Erlang的垃圾回收是以Erlang进程为粒度的。因为Erlang的消息传递和进程私有堆机制,使得按进程进行GC很容易实现,不必对一块内存或一个对象进行额外的引用计数。虽然对于单个进程来说,GC期间是“Stop The World”,但是前面也说过一个Erlang应用允许同时存在成千上万个进程,因此一个进程STW对于系统整体性能影响几乎微乎其微。另外,当进程需要销毁时,这个进程占用的所有内存可以直接回收,因为这块内存中的数据都是这个进程私有的。

另一方面,Erlang虚拟机对进程的调度采用的是抢占式策略。每个进程的运行周期都会分配到一定数量的reduction,当进程在进行外部模块函数调用,BIF调用,甚至算术运算都会减少reduction数量,当reduction数量减为0时,此进程交出CPU使用权,被其他进程抢占。相比于一些基于时间分片的软实时系统调度算法,reduction机制更加关注的是进程在执行期间能做多少事情,而不是时间上的绝对平均。

RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量,因此设置内存流控阈值vm_memory_high_watermark时需要注意,默认的0.4就是一个保险的值,当超过0.5时,就有可能造成系统内存被瞬间吃完,RabbitMQ程序被系统OOM Killer杀掉。

3. 分布式。
Erlang可以说原生支持分布式,先看一段程序:

1
2
3
4
5
6
7
8
9
10
run(Node) -> 
    Pid = spawn(Node, fun ping/0), 
    Pid ! self(), 
    receive 
        ping -> ok 
    end. 
ping() ->
    receive
        From -> From ! ping 
    end.

上述程序演示的是一个分布式并发程序,运用了spawn/2,!,receive...end这三个并发原语。spawn/2用于创建进程,!用于异步发送消息,receive...end用于接收消息,注意spawn/2的第一个参数Node,它表示节点名称,这意味着对于应用来说,Pid ! Msg 就是将Msg消息发送到某一个Erlang进程,而无论这个进程是本地进程还是存在于远程的某个节点上,Erlang虚拟机会帮应用搞定一切底层通信机制。也就是说,物理节点分布式对上层Erlang应用来说是透明的。

这为实现RabbitMQ的集群和HA policy机制提供了极大的便利,主节点只要维护哪个是Pid(master),哪几个是slave_pids(slave)信息就行,根据不同的类型(publish和非publish),对队列操作进行replication。

4. 健壮性。
在Erlang的设计哲学里,有一个重要的概念就是“let it crash”。Erlang不提倡防御式编程,它认为程序既然遇到错误就应该让它崩溃,对于一个健壮的系统来说,崩溃不要紧,关键要重新起来。Erlang提供一种supervisor的行为模式,用于构建一棵健壮的进程监督树。监督者进程本身不包含业务逻辑,它只负责监控子进程,当子进程退出时,监督者进程可以根据一些策略将子进程重启。据说爱立信用Erlang写的AXD301交换机系统,可靠性为9个9,这意味着运行20年差不多有1秒的不可用时间,如此高的可靠性就是supervisor行为模式及其背后任其崩溃思想的极致体现(当然也离不开Erlang另外一个法宝代码热更新)。

在RabbitMQ里,supervisor行为模式运用得非常多,基本上每个worker进程都有相应的监督者进程,层层监督。比如下图所示的网络层进程监督树模型(已做过简化):
RabbitMQ Supervisor Tree

椭圆表示进程,矩形表示重启策略,one_for_all表示一个进程挂了其监督者进程的其他子进程也会被重启,比如一个rabbit_reader进程挂了,那么rabbit_channel_sup3进程也会重启,然后所有rabbit_channel根据AMQP协议协商后重新创建。simple_one_for_one则表示一种需要时再初始化的子进程重启策略,适用于一些动态添加子进程的场景,比如图中的rabbit_channel进程和tcp_acceptor进程。

--EOF--

Erlang运行时之Message Passing

本文试图分析Erlang消息传递机制,总结优缺点,大部分资料来源于论文『Characterizing the Scalability of Erlang VM on Many-core Processors』和stackoverflow,并且带有自己的理解,不当之处需多包涵。

Erlang进程间通信采用的策略是消息传递(Message Passing)。这里的传递是指传递消息副本,而不是传递消息指针或者引用。小消息的复制成本可以忽略,现在的DDR3 SDRAM内存带宽约为2000MT/s,64位数据总线下有近16GB/s的数据传输速度。从VM的角度看,消息传递的本质就是调度器将消息从发送者进程的堆中复制到接收者进程的堆中。基于复制的消息传递有一个好处,可以非常方便实现以进程为单位的GC,因为GC范围是进程私有堆,不必考虑堆中的数据是否被其他进程引用。但是并不是所有消息都是通过复制实现消息传递,大于64K的二进制数据是通过共享内存实现的,前文已有提及。消息太大时,进程GC频繁触发,并且大消息复制成本也无法忽略,这两点带来的性能损耗可能比共享对象引用计数造成的性能损失还要大,因此,Erlang权衡之下对二进制数据采用了不同的消息传递策略。

在多核时代,不同的Erlang进程被不同的调度器执行。当接收进程正被其他调度器执行,而它的堆空间不够或者正有其他消息往堆中发送消息时,发送进程会分配一块临时的Heap Fragment(参考『Erlang运行时之进程』中的图1),用于存储待发送消息的副本,这块Heap Fragment数据会在GC期间被合入接收进程的堆空间。发送进程把消息复制完毕后,生成一个该消息的元数据结构(data management structure),该结构体包含了指向消息副本(处在接收进程堆或者Heap Fragment里)的指针,最后将元数据结构体放入接受进程的消息队列(mailbox)里。如果此时接收进程处于suspended状态,那么它会被激活,处理该消息;如果本来就是运行状态,那么消息会被等待取走进行匹配。

接收进程的mailbox在逻辑上是一个消息队列,但在物理上是通过两个物理队列实现,分别为public queue和private queue。前者对应PCB中的ErlMessageInQueue,只有在支持SMP的erts才有,用于接收其他进程发送来的消息,发送进程操作此队列时需要加锁互斥访问,避免不同进程操作同一块内存;后者对应PCB中的ErlMessageQueue,是接收进程取消息的队列,这样接收进程就可以不必关心锁带来的额外开销了,否则接收进程必须通过获取锁才能安全地读取完整的消息数据。当private queue为空时,public queue的消息会被追加到private queue里。由此可见,mailbox(含public queue和private queue)本身处在PCB中,而不是在堆中。ErlMessageInQueue和ErlMessageQueue的定义(erlang/erts/emulator/beam/erl_message.h)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct {
    ErlMessage* first;
    ErlMessage** last;  /* point to the last next pointer */
    ErlMessage** save;
    int len;            /* queue length */
} ErlMessageQueue;
 
#ifdef ERTS_SMP
typedef struct {
    ErlMessage* first;
    ErlMessage** last;  /* point to the last next pointer */
    int len;            /* queue length */
} ErlMessageInQueue;
#endif

其中,ErlMessageQueue结构体有个save属性,这个save链表就是进程selective recieve时匹配失败后将消息暂存的save queue。

在支持SMP的Erlang虚拟机里,消息传递机制会引入一些额外消耗,这些消耗会在一定程度上影响到CPU核数在erts上的可扩展性:
1. 上文已有提及,分属不同调度器的发送进程向同一个接收进程发送消息时,元数据结构入public queue需要锁去同步。
2. 发送进程要从OS进程的堆空间分配内存供存储消息(或者Heap Fraigment)和元数据使用,这个分配内存的操作需要加锁。同理,内存释放操作也同样需要加锁。
3. 当大量进程并行执行时,他们的消息收发顺序相对于单核模式下是乱序的,虽然不影响最终接收进程的消息匹配,但是会带来损耗,原因是进程的消息接收采用selective receive机制,有些消息会被匹配多次。

Erlang的消息传递机制基于复制。我觉得,通过传递消息副本来避免锁不是Erlang高效的直接原因,因为有变量不变性(immutable variable)保障,即使传递引用也可以避免不必要的锁开销,况且,在支持SMP的运行时环境下,锁作为同步手段也经常使用。但是基于复制带来的好处是方便实现以进程为粒度的GC机制,从而在间接上提升了Erlang的并发性。

--EOF--

Erlang运行时之进程

本文试图从进程角度解释Erlang之所以高效的原因,大部分资料来源于论文『Characterizing the Scalability of Erlang VM on Many-core Processors』,并且带有自己的理解,不当之处请多包涵。

Erlang作为一门面向并发的语言(Concurrent Oriented Programming, COP),进程扮演着重要的作用,可以说Erlang就是一门面向进程的语言。归根到底,Erlang的核心概念无非就是进程、模式匹配、消息传递三大法宝。

目前主流的Erlang虚拟机是BEAM(Bogdan/Bjrn’s Erlang Abstract Machine),早期的JAM, old BEAM现都已经废弃不用。Erlang虚拟机是运行在操作系统中的一个多线程进程。Linux下,用POSIX线程库(pthread)实现,多线程共享进程(VM)的内存空间。一般来说,Erlang虚拟机会为每个CPU核分配两个线程,一个负责IO,一个作为调度器负责调度Erlang进程。

Erlang进程是虚拟机级别的进程,它非常轻量,初始化时只有2K左右,Erlang官方文档有给出测试初始进程占用内存大小的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Eshell V6.1  (abort with ^G)
1> Fun = fun() -> receive after infinity -> ok end end.
#Fun<erl_eval.20.90072148>
2> Pid = spawn(Fun).
<0.35.0>
3> {_,Bytes} = process_info(Pid, memory).
{memory,2680}
4> Bytes div erlang:system_info(wordsize).
335
5> erlang:process_info(Pid).
 ......
 {total_heap_size,233},
 {heap_size,233},
 {stack_size,9},
 ......

可以看到,一个进程包含堆栈在内只需2680B内存,其中堆(含栈)大小为233个字,64位系统下一个字等于8个字节,堆栈占用1864B。实际上,如果只计算PCB,大约只占300B,相比Linux PCB的1K也轻量不小。另外,Joe Armstrong在『Progamming Erlang』中也有过示范,物理内存充足的情况下,spawn一个进程只需花费微秒数量级的时间。因此,Erlang系统中允许同时存在成千上万的进程。

Erlang Process
图1

图1是Erlang进程的内部组成,每个进程都由独立的进程控制块(PCB, process control block)、栈和私有堆三部分组成。PCB包含的信息有进程ID、堆栈起始地址、mailbox、程序寄存器(PC)、参数寄存器等等,完整定义可以参考Erlang运行时(erts)源代码头文件erlang/erts/emulator/beam/erl_process.h中的process结构体,以下是几个主要字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct process {
    Eterm* htop;    /* Heap top */
    Eterm* stop;    /* Stack top */
    Eterm* heap;    /* Heap start */
    Eterm* hend;    /* Heap end */
    Uint heap_sz;   /* Size of heap in words */
    Uint min_heap_size; /* Minimum size of heap (in words). */
    Eterm* i;       /* Program counter for threaded code. */
    Uint32 status;  /* process STATE */
    Eterm id;       /* The pid of this process */
    Uint reds;      /* No of reductions for this process  */
    Process *next;  /* Pointer to next process in run queue */
    Process *prev;  /* Pointer to prev process in run queue */
    ErlMessageQueue msg;/* Message queue */
#ifdef ERTS_SMP
    ErlMessageInQueue msg_inq;
#endif
    ......
};

进程堆和栈共同占用一块连续的内存空间,堆空间由低地址向高地址增长,栈空间由高地址向低地址增长,当堆顶和栈顶一样时,可以判定堆栈空间已满,需要通过垃圾回收空间和或者增长空间。在Erlang进程看来,这块堆栈内存是独占的,进程间彼此隔离;在操作系统进程看来,所有Erlang进程的堆栈空间都在自己的堆空间里。Erlang进程里堆空间和栈空间存放数据类型有所区分,前者主要是一些复合数据,比如元组、列表和大数等,后者主要存放一些简单数据类型以及堆中复合数据的引用。

图2是以列表和元组为例展示了Erlang进程中堆栈的内存布局:

Heap Layout
图2

Erlang是动态类型语言,变量类型需要到运行时才能确定,因此,堆栈中每个数据都有一个Type标签表示其类型。元组在堆中是以Array的形式存储,有字段表示元组大小,并且在栈中有一个指针(引用)指向这块堆空间,因为是连续空间,要取出元组中的数据只需O(1)的复杂度计算内存偏移量即可。对于列表来说,列表元素在堆中是以链表形式存在的,由栈中的一个指针指向列表的第一个元素。相邻列表元素在内存中并不连续,也没有字段表示列表大小,因此要获取列表大小只能通过遍历,这个操作是个O(N)的时间复杂度。对于lists:append(ListB, ListA)这个操作,Erlang做的事情是先复制ListB,遍历ListB,找到列表尾部元素,将下一元素指针从NIL改为ListA第一个元素地址。由此可知,要提高性能,最好是将较长的列表追加到较短的列表上,以减少遍历时间。另外,也不要试图在列表尾部追加元素,原因同上,之前的一篇『Erlang列表操作性能分析』对此已做过分析。如果将ListC当做消息内容发送给其他进程,则整个ListC列表都会复制一份,即使往同一个节点发送多次,复制也会进行多次,这往往会导致消息接收进程占用的内存空间比发送进程大,因为对接收进程来说,每次接到的ListC会被当成不同的数据。

从图2还可以看出,ListA和ListC共享了部分数据,也就说,在一个Erlang进程内部,是存在内存共享的情况的。在Erlang里,变量拥有不变性,一次赋值(模式匹配)成功,它就不会再变,因此,ListA和ListC可以永远安全地共享这些数据。当然,Erlang中的内存共享在其他场景下也会出现,在图1中所示的有两块内存共享区域,一块是二进制数据共享区,用于存储大于64K的二进制数据;另一块是存储ETS表用的,ETS可以供每个进程访问,相比真正的共享内存有一些不同,它基于消息复制以记录为单位进行存取;相比数据库,它弱了很多,不支持事务机制。

当进程堆栈空间满时,会触发调度器对进程进行GC,如果GC结束堆栈空间仍然不足,则会分配新空间。Erlang的GC是以进程为单位,对某个进程GC不会影响其他进程的执行,虽然对单个进程来说,存在stop the world的现象,但是从全局来看,其他进程不会受影响,这个特性使得Erlang能够应付大规模高并发的业务场景,基于Elrang的业务系统可以达到软实时的级别。另外,一旦进程生命周期结束,GC可以非常方便地直接回收这个进程占用的所有内存。

综上,从进程的角度的来看,使得Erlang高效主要是以下方面:
1. 进程本身轻量。所以线程池的概念在Erlang语言层面根本不存在。
2. 变量不变性避免了很多无谓的数据复制。如List操作,直接通过修改指针实现append操作。
3. 以进程为单位进行GC。

References:
[1] Characterizing the Scalability of Erlang VM on Many-core Processors
[2] Erlang does have shared memory
[3] Erlang vs Java memory architecture
[4] Erlang - Programming the Parallel World

--EOF--

gen_server behaviour exercise

In these exercises, we’ll make a server in the module job_centre, which uses gen_server to implement a job management service. The job center keeps a queue of jobs that have to be done. The jobs are numbered. Anybody can add jobs to the queue. Workers can request jobs from the queue and tell the job center that a job has been performed. Jobs are represented by funs. To do a job F, a worker must evaluate the function F().
1. Implement the basic job center functionality, with the following interface:
job_centre:start_link() -> true.
    Start the job center server.
job_centre:add_job(F) -> JobNumber.
    Add a job F to the job queue. Return an integer job number.
job_centre:work_wanted() -> {JobNumber,F} | no.
    Request work. When a worker wants a job, it calls job_centre:work_wanted(). If there are jobs in the queue, a tuple {JobNumber, F} is returned. The worker performs the job by evaluating F(). If there are no jobs in the queue, no is returned. Make sure the same job cannot be allocated to more than one worker at a time. Make sure that the system is fair, meaning that jobs are handed out in the order they were requested.
job_centre:job_done(JobNumber)
    Signal that a job has been done. When a worker has completed a job, it must call job_centre:job_done(JobNumber).
2. Add a statistics call called job_centre:statistics() that reports the status of the jobs in the queue and of jobs that are in progress and that have been done.
3. Add code to monitor the workers. If a worker dies, make sure the jobs it was doing are returned to the pool of jobs waiting to be done.
4. Check for lazy workers; these are workers that accept jobs but don’t deliver on time. Change the work wanted function to return {JobNumber, JobTime, F} where JobTime is a time in seconds that the worker has to complete the job by. At time JobTime - 1, the server should send a hurry_up message to the worker if it has not finished the job. And at time JobTime + 1, it should kill the worker process with an exit(Pid, youre_fired) call.

Ans:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
-module(job_center).
-behaviour(gen_server).
 
-export([start_link/0, add_job/2, work_wanted/0, job_done/1, statistics/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TIME, 1000).
-record(state, {undo, doing, done, index}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
  gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
add_job(F, Time) ->
  gen_server:call(?MODULE, {add_job, F, Time}).
work_wanted() ->
  gen_server:call(?MODULE, get_job).
job_done(JobNumber) ->
  gen_server:cast(?MODULE, {job_done, JobNumber}).
statistics() ->
  gen_server:call(?MODULE, statistics).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
  process_flag(trap_exit, true),
  timer:send_interval(?TIME, self(), check_worker),
  {ok, #state{undo = queue:new(), doing = [], done = [], index = 1}}.
 
handle_call({add_job, F, Time}, _From, #state{undo = UndoPool, index = Idx} = State) ->
  State1 = State#state{undo = queue:in({Idx, Time, F}, UndoPool), index = Idx + 1},
  {reply, Idx, State1};
handle_call(get_job, {From, _}, #state{undo = UndoPool, doing = DoingPool} = State) ->
  {Reply, State1} = case queue:out(UndoPool) of
                      {{value, {Idx, Time, Fun}}, UndoPool2} ->
                        {{Idx, Time, Fun}, State#state{undo = UndoPool2, doing = [{Idx, Time, Fun, From, now_in_secs()} | DoingPool]}};
                      {empty, _} ->
                        {no, State}
                    end,
  erlang:monitor(process, From),
  {reply, Reply, State1};
handle_call(statistics, _From, #state{undo = UndoPool, doing = DoingPool, done = DonePool} = State) ->
  io:format("undo:~p~n", [UndoPool]),
  io:format("doing:~p~n", [DoingPool]),
  io:format("done:~p~n", [DonePool]),
  {reply, [{undo, queue:len(UndoPool)}, {doing, length(DoingPool)}, {done, length(DonePool)}], State};
handle_call(_Request, _From, State) ->
  {reply, ok, State}.
 
handle_cast({job_done, JobNumber}, #state{doing = DoingPool, done = DonePool} = State) ->
  Item = lists:keyfind(JobNumber, 1, DoingPool),
  DoingPool2 = lists:delete(Item, DoingPool),
  {Idx, Time, Fun, From, _} = Item,
  DonePool2 = [{Idx, Time, Fun, From} | DonePool],
  {noreply, State#state{doing = DoingPool2, done = DonePool2}};
handle_cast(_Request, State) ->
  {noreply, State}.
 
handle_info({'DOWN', _Ref, process, From, normal}, #state{undo = UndoPool, doing = DoingPool} = State) ->
  State1 = case lists:keyfind(From, 4, DoingPool) of
             Item when is_tuple(Item) ->
               {Idx, Time, Fun, _From, _} = Item,
               DoingPool2 = lists:delete(Item, DoingPool),
               State#state{undo = queue:in({Idx, Time, Fun}, UndoPool), doing = DoingPool2};
             false -> State
           end,
  {noreply, State1};
handle_info(check_worker, #state{undo = UndoPool, doing = DoingPool} = State) ->
  Now = now_in_secs(),
  F = fun(Doing) ->
    {Idx, Time, Fun, From, StartTime} = Doing,
    if
      Now =:= StartTime + Time - 1 -> From ! hurry_up;
      Now =:= StartTime + Time + 1 ->
        exit(From, youre_fired),
        State#state{undo = queue:in({Idx, Time, Fun}, UndoPool)};
      true -> ok
    end
  end,
  lists:foreach(F, DoingPool),
  {noreply, State};
handle_info(Info, State) ->
  io:format("extra info:~p~n", [Info]),
  {noreply, State}.
 
terminate(_Reason, _State) ->
  ok.
code_change(_OldVsn, State, _Extra) ->
  {ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
now_in_secs() ->
  {A, B, _} = os:timestamp(),
  A * 1000000 + B.

Test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
$ erl -sname aa
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:4:4] [async-threads:10]
 
Eshell V6.1  (abort with ^G)
1> c(job_center).
{ok,job_center}
2> F1 = fun() -> 1 end.
#Fun<erl_eval.20.90072148>
3> F2 = fun() -> 2 end.
#Fun<erl_eval.20.90072148>
4> job_center:add_job(F1, 10).
1
5> job_center:add_job(F2, 10).
2
6> job_center:add_job(F1, 20).
3
7> job_center:add_job(F2, 20).
4
8> job_center:statistics().
undo:{[{4,20,#Fun<erl_eval.20.90072148>},
       {3,20,#Fun<erl_eval.20.90072148>},
       {2,10,#Fun<erl_eval.20.90072148>}],
      [{1,10,#Fun<erl_eval.20.90072148>}]}
doing:[]
done:[]
[{undo,4},{doing,0},{done,0}]
9> job_center:work_wanted().
{1,10,#Fun<erl_eval.20.90072148>}
10> job_center:job_done(1).
ok
11> job_center:statistics().
undo:{[{4,20,#Fun<erl_eval.20.90072148>},{3,20,#Fun<erl_eval.20.90072148>}],
      [{2,10,#Fun<erl_eval.20.90072148>}]}
doing:[]
done:[{1,10,#Fun<erl_eval.20.90072148>,<0.83.0>}]
[{undo,3},{doing,0},{done,1}]
12> job_center:work_wanted().
{2,10,#Fun<erl_eval.20.90072148>}
13> receive V -> V end.
hurry_up
 
=ERROR REPORT==== 24-Aug-2014::14:37:03 ===
** Generic server job_center terminating
** Last message in was {'EXIT',<0.83.0>,youre_fired}
** When Server state == {state,{[{4,20,#Fun<erl_eval.20.90072148>}],
                                [{3,20,#Fun<erl_eval.20.90072148>}]},
                               [{2,10,#Fun<erl_eval.20.90072148>,<0.83.0>,
                                 1408862212}],
                               [{1,10,#Fun<erl_eval.20.90072148>,<0.83.0>}],
                               5}
** Reason for termination ==
** youre_fired
** exception error: youre_fired

Exercises from 『Programming Erlang (2Edtion)』.

--EOF--

erlang map?

Joe Armstrong在『Programming Erlang(2 Edition)』中介绍了R17的新增特性:map数据结构,这称得上是里程碑式的大功能了。map的语法还算简单,以下是其定义:

1
#{Key1 Op Val1, Key2 Op Val2, ... , KeyN Op ValN}

其中,Op是=>或:=两个操作符中的其中一个。前者表示更新Key对应的Value或者添加K-V对;后者表示将Key对应的Value进行更新,如果Key不存在,则返回异常。

书中还介绍了map在模式匹配中的应用。比如下面这个demo就是通过map实现的计算某个字符串里各个字符的出现次数:

1
2
3
4
5
6
7
8
9
count_characters(Str) ->
count_characters(Str, #{}).
 
count_characters([H|T], #{ H => N } = X) ->
    count_characters(T, X#{ H := N+1 });
count_characters([H|T], X) ->
    count_characters(T, X#{ H => 1});
count_characters([], X) ->
    X.

纸上得来终觉浅,在我把本地的R15替换成R17,准备尝试的时候问题出现了,这个demo连编译都过不了:

1
2
3
4
5
6
1> c(demo).
demo.erl:7: illegal pattern
demo.erl:8: variable 'N' is unbound
demo.erl:10: illegal use of variable 'H' in map
demo.erl:7: Warning: variable 'H' is unused
error

里里外外翻了R17的release note,发现R17里map的实现远不是书中介绍的这么回事。

以下是R17中能支持的4种类型map操作:

1
2
3
4
M0 = #{ a => 1, b => 2}, % create associations
M1 = M0#{ a := 10 }, % update values
M2 = M1#{ "hi" => "hello"}, % add new associations
#{ "hi" := V1, a := V2, b := V3} = M2. % match keys with values

下面的特性是不支持的:
-- No variable keys (key中不能含变量)
-- No single value access (不支持取单个value,即不支持#{Key}操作)
-- No map comprehensions (不支持map推导,例如 #{X => foggy || X <- [london,boston]}.) 好在R17已经提供了一个maps模块,封装好了map的基础操作。通过这些内置函数,不难实现上文中的count_characters/1函数。

1
2
3
4
5
6
7
8
9
count_characters(Str) ->
  count_characters(Str, maps:new()).
count_characters([], M) ->
  M;
count_characters([H | T], M) ->
  case maps:find(H, M) of
    {ok, Val} -> count_characters(T, maps:update(H, Val + 1, M));
    error -> count_characters(T, maps:put(H, 1, M))
  end.

map的完整蓝图在EEP-43上有定义,书中介绍的应该是说将来map能支持这些操作,事实上R17.0所实现的只是map定义中很小的一个子集。关于Erlang map数据结构的前世今生这里有描述。另外有些文章认为,存储大量的K-V时,当前版本的map效率比dict, gb_tree要差,使用时需慎重,不过有些场景下用来替换record还是绰绰有余的。

--EOF--