分类目录归档:『Programming Erlang』

gen_server behaviour exercise

In these exercises, we’ll make a server in the module job_centre, which uses gen_server to implement a job management service. The job center keeps a queue of jobs that have to be done. The jobs are numbered. Anybody can add jobs to the queue. Workers can request jobs from the queue and tell the job center that a job has been performed. Jobs are represented by funs. To do a job F, a worker must evaluate the function F().
1. Implement the basic job center functionality, with the following interface:
job_centre:start_link() -> true.
    Start the job center server.
job_centre:add_job(F) -> JobNumber.
    Add a job F to the job queue. Return an integer job number.
job_centre:work_wanted() -> {JobNumber,F} | no.
    Request work. When a worker wants a job, it calls job_centre:work_wanted(). If there are jobs in the queue, a tuple {JobNumber, F} is returned. The worker performs the job by evaluating F(). If there are no jobs in the queue, no is returned. Make sure the same job cannot be allocated to more than one worker at a time. Make sure that the system is fair, meaning that jobs are handed out in the order they were requested.
job_centre:job_done(JobNumber)
    Signal that a job has been done. When a worker has completed a job, it must call job_centre:job_done(JobNumber).
2. Add a statistics call called job_centre:statistics() that reports the status of the jobs in the queue and of jobs that are in progress and that have been done.
3. Add code to monitor the workers. If a worker dies, make sure the jobs it was doing are returned to the pool of jobs waiting to be done.
4. Check for lazy workers; these are workers that accept jobs but don’t deliver on time. Change the work wanted function to return {JobNumber, JobTime, F} where JobTime is a time in seconds that the worker has to complete the job by. At time JobTime - 1, the server should send a hurry_up message to the worker if it has not finished the job. And at time JobTime + 1, it should kill the worker process with an exit(Pid, youre_fired) call.

Ans:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
-module(job_center).
-behaviour(gen_server).
 
-export([start_link/0, add_job/2, work_wanted/0, job_done/1, statistics/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TIME, 1000).
-record(state, {undo, doing, done, index}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
  gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
add_job(F, Time) ->
  gen_server:call(?MODULE, {add_job, F, Time}).
work_wanted() ->
  gen_server:call(?MODULE, get_job).
job_done(JobNumber) ->
  gen_server:cast(?MODULE, {job_done, JobNumber}).
statistics() ->
  gen_server:call(?MODULE, statistics).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
  process_flag(trap_exit, true),
  timer:send_interval(?TIME, self(), check_worker),
  {ok, #state{undo = queue:new(), doing = [], done = [], index = 1}}.
 
handle_call({add_job, F, Time}, _From, #state{undo = UndoPool, index = Idx} = State) ->
  State1 = State#state{undo = queue:in({Idx, Time, F}, UndoPool), index = Idx + 1},
  {reply, Idx, State1};
handle_call(get_job, {From, _}, #state{undo = UndoPool, doing = DoingPool} = State) ->
  {Reply, State1} = case queue:out(UndoPool) of
                      {{value, {Idx, Time, Fun}}, UndoPool2} ->
                        {{Idx, Time, Fun}, State#state{undo = UndoPool2, doing = [{Idx, Time, Fun, From, now_in_secs()} | DoingPool]}};
                      {empty, _} ->
                        {no, State}
                    end,
  erlang:monitor(process, From),
  {reply, Reply, State1};
handle_call(statistics, _From, #state{undo = UndoPool, doing = DoingPool, done = DonePool} = State) ->
  io:format("undo:~p~n", [UndoPool]),
  io:format("doing:~p~n", [DoingPool]),
  io:format("done:~p~n", [DonePool]),
  {reply, [{undo, queue:len(UndoPool)}, {doing, length(DoingPool)}, {done, length(DonePool)}], State};
handle_call(_Request, _From, State) ->
  {reply, ok, State}.
 
handle_cast({job_done, JobNumber}, #state{doing = DoingPool, done = DonePool} = State) ->
  Item = lists:keyfind(JobNumber, 1, DoingPool),
  DoingPool2 = lists:delete(Item, DoingPool),
  {Idx, Time, Fun, From, _} = Item,
  DonePool2 = [{Idx, Time, Fun, From} | DonePool],
  {noreply, State#state{doing = DoingPool2, done = DonePool2}};
handle_cast(_Request, State) ->
  {noreply, State}.
 
handle_info({'DOWN', _Ref, process, From, normal}, #state{undo = UndoPool, doing = DoingPool} = State) ->
  State1 = case lists:keyfind(From, 4, DoingPool) of
             Item when is_tuple(Item) ->
               {Idx, Time, Fun, _From, _} = Item,
               DoingPool2 = lists:delete(Item, DoingPool),
               State#state{undo = queue:in({Idx, Time, Fun}, UndoPool), doing = DoingPool2};
             false -> State
           end,
  {noreply, State1};
handle_info(check_worker, #state{undo = UndoPool, doing = DoingPool} = State) ->
  Now = now_in_secs(),
  F = fun(Doing) ->
    {Idx, Time, Fun, From, StartTime} = Doing,
    if
      Now =:= StartTime + Time - 1 -> From ! hurry_up;
      Now =:= StartTime + Time + 1 ->
        exit(From, youre_fired),
        State#state{undo = queue:in({Idx, Time, Fun}, UndoPool)};
      true -> ok
    end
  end,
  lists:foreach(F, DoingPool),
  {noreply, State};
handle_info(Info, State) ->
  io:format("extra info:~p~n", [Info]),
  {noreply, State}.
 
terminate(_Reason, _State) ->
  ok.
code_change(_OldVsn, State, _Extra) ->
  {ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
now_in_secs() ->
  {A, B, _} = os:timestamp(),
  A * 1000000 + B.

Test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
$ erl -sname aa
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:4:4] [async-threads:10]
 
Eshell V6.1  (abort with ^G)
1> c(job_center).
{ok,job_center}
2> F1 = fun() -> 1 end.
#Fun<erl_eval.20.90072148>
3> F2 = fun() -> 2 end.
#Fun<erl_eval.20.90072148>
4> job_center:add_job(F1, 10).
1
5> job_center:add_job(F2, 10).
2
6> job_center:add_job(F1, 20).
3
7> job_center:add_job(F2, 20).
4
8> job_center:statistics().
undo:{[{4,20,#Fun<erl_eval.20.90072148>},
       {3,20,#Fun<erl_eval.20.90072148>},
       {2,10,#Fun<erl_eval.20.90072148>}],
      [{1,10,#Fun<erl_eval.20.90072148>}]}
doing:[]
done:[]
[{undo,4},{doing,0},{done,0}]
9> job_center:work_wanted().
{1,10,#Fun<erl_eval.20.90072148>}
10> job_center:job_done(1).
ok
11> job_center:statistics().
undo:{[{4,20,#Fun<erl_eval.20.90072148>},{3,20,#Fun<erl_eval.20.90072148>}],
      [{2,10,#Fun<erl_eval.20.90072148>}]}
doing:[]
done:[{1,10,#Fun<erl_eval.20.90072148>,<0.83.0>}]
[{undo,3},{doing,0},{done,1}]
12> job_center:work_wanted().
{2,10,#Fun<erl_eval.20.90072148>}
13> receive V -> V end.
hurry_up
 
=ERROR REPORT==== 24-Aug-2014::14:37:03 ===
** Generic server job_center terminating
** Last message in was {'EXIT',<0.83.0>,youre_fired}
** When Server state == {state,{[{4,20,#Fun<erl_eval.20.90072148>}],
                                [{3,20,#Fun<erl_eval.20.90072148>}]},
                               [{2,10,#Fun<erl_eval.20.90072148>,<0.83.0>,
                                 1408862212}],
                               [{1,10,#Fun<erl_eval.20.90072148>,<0.83.0>}],
                               5}
** Reason for termination ==
** youre_fired
** exception error: youre_fired

Exercises from 『Programming Erlang (2Edtion)』.

--EOF--

erlang map?

Joe Armstrong在『Programming Erlang(2 Edition)』中介绍了R17的新增特性:map数据结构,这称得上是里程碑式的大功能了。map的语法还算简单,以下是其定义:

1
#{Key1 Op Val1, Key2 Op Val2, ... , KeyN Op ValN}

其中,Op是=>或:=两个操作符中的其中一个。前者表示更新Key对应的Value或者添加K-V对;后者表示将Key对应的Value进行更新,如果Key不存在,则返回异常。

书中还介绍了map在模式匹配中的应用。比如下面这个demo就是通过map实现的计算某个字符串里各个字符的出现次数:

1
2
3
4
5
6
7
8
9
count_characters(Str) ->
count_characters(Str, #{}).
 
count_characters([H|T], #{ H => N } = X) ->
    count_characters(T, X#{ H := N+1 });
count_characters([H|T], X) ->
    count_characters(T, X#{ H => 1});
count_characters([], X) ->
    X.

纸上得来终觉浅,在我把本地的R15替换成R17,准备尝试的时候问题出现了,这个demo连编译都过不了:

1
2
3
4
5
6
1> c(demo).
demo.erl:7: illegal pattern
demo.erl:8: variable 'N' is unbound
demo.erl:10: illegal use of variable 'H' in map
demo.erl:7: Warning: variable 'H' is unused
error

里里外外翻了R17的release note,发现R17里map的实现远不是书中介绍的这么回事。

以下是R17中能支持的4种类型map操作:

1
2
3
4
M0 = #{ a => 1, b => 2}, % create associations
M1 = M0#{ a := 10 }, % update values
M2 = M1#{ "hi" => "hello"}, % add new associations
#{ "hi" := V1, a := V2, b := V3} = M2. % match keys with values

下面的特性是不支持的:
-- No variable keys (key中不能含变量)
-- No single value access (不支持取单个value,即不支持#{Key}操作)
-- No map comprehensions (不支持map推导,例如 #{X => foggy || X <- [london,boston]}.) 好在R17已经提供了一个maps模块,封装好了map的基础操作。通过这些内置函数,不难实现上文中的count_characters/1函数。

1
2
3
4
5
6
7
8
9
count_characters(Str) ->
  count_characters(Str, maps:new()).
count_characters([], M) ->
  M;
count_characters([H | T], M) ->
  case maps:find(H, M) of
    {ok, Val} -> count_characters(T, maps:update(H, Val + 1, M));
    error -> count_characters(T, maps:put(H, 1, M))
  end.

map的完整蓝图在EEP-43上有定义,书中介绍的应该是说将来map能支持这些操作,事实上R17.0所实现的只是map定义中很小的一个子集。关于Erlang map数据结构的前世今生这里有描述。另外有些文章认为,存储大量的K-V时,当前版本的map效率比dict, gb_tree要差,使用时需慎重,不过有些场景下用来替换record还是绰绰有余的。

--EOF--

erlang concurrent primitive exercise

1. Write a function my_spawn(Mod, Func, Args) that behaves like spawn(Mod, Func, Args) but with one difference. If the spawned process dies, a message should be printed saying why the process died and how long the process lived for before it died.

1
2
3
4
5
6
7
8
9
10
11
12
13
my_spawn(Mod, Func, Args) ->
  Pid = spawn(Mod, Func, Args),
  spawn(fun() ->
    {StartMegaSecs, StartSecs, _} = erlang:now(),
    Ref = monitor(process, Pid),
    receive
      {'DOWN', Ref, process, Pid, Why} ->
        {EndMegaSecs, EndSecs, _} = erlang:now(),
        io:format("~p exit, reason: ~p, time: ~p~n",
          [Pid, Why, (EndMegaSecs-StartMegaSecs)*1000000+EndSecs-StartSecs])
    end
  end),
  Pid.

Keypoint: spawn/3, monitor/2, now/0.

2. Write a function my_spawn(Mod, Func, Args, Time) that behaves like spawn(Mod, Func, Args) but with one difference. If the spawned process lives for more than Time seconds, it should be killed.

1
2
3
4
5
6
7
8
9
my_spawn2(Mod, Func, Args, Time) ->
  Pid = spawn(Mod, Func, Args),
  spawn(fun() ->
    monitor(process, Pid),
    receive
    after Time * 1000 -> exit(Pid, timeout)
    end
  end),
  Pid.

Keypoint: spawn/1, monitor/2, receive after end, exit/2.

3. Write a function that creates a registered process that writes out "running" every five seconds. Write a function that monitors this process and restarts it if it dies.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-export([monitor_process/0]).
 
monitor_process() ->
  Pid = register_process(),
  loop_monitor(Pid).
 
start() ->
  io:format("~p running~n", [self()]),
  timer:sleep(5000),
  start().
 
register_process() ->
  spawn(fun start/0).
 
loop_monitor(Pid) ->
  Ref = monitor(process, Pid),
  receive
    {'DOWN', Ref, process, Pid, Why} ->
      io:format("~p shutdown due to ~p, restart it.~n", [Pid, Why]),
      loop_monitor(register_process())
  end.

Keypoint: timer:sleep/1, monitor/2, tail recursion.

4. Write a function that starts and monitors several worker processes. If any of the worker processes die, restart it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-export([restart_one_for_one/0]).
restart_one_for_one() ->
  [start_and_monitor() || _ <- lists:seq(1, 10)],
  loop().
 
start() ->
  io:format("~p running~n", [self()]),
  timer:sleep(5000),
  start().
 
start_and_monitor() ->
  Pid = spawn(fun start/0),
  monitor(process, Pid).
 
loop() ->
  receive
    {'DOWN', _, process, Pid, Why} ->
      io:format("~p shutdown due to ~p, restart it.~n", [Pid, Why]),
      start_and_monitor()
  end,
  loop().

Keypoint: monitor/2, tail recursion, list comprehension.

5. Write a function that starts and monitors several worker processes. If any of the worker processes die, kill all the worker processes and restart them all.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-export([restart_one_for_all/0]).
 
restart_one_for_all() ->
  CenterPid = spawn(fun() -> [spawn_link(fun start/0) || _ <- lists:seq(1, 10)],
    receive after infinity -> true end end),
  io:format("Monitor Pid ~p~n", [CenterPid]),
  monitor(process, CenterPid),
  restart_handler().
 
start() ->
  io:format("~p running~n", [self()]),
  timer:sleep(5000),
  start().
 
restart_handler() ->
  receive
    {'DOWN', _, process, Pid, Why} ->
      io:format("~p shutdown due to ~p, restart it.~n", [Pid, Why]),
      restart_one_for_all()
  end.

Keypoint: monitor/2, spawn_link/1, list comprehension, receive after infinity end.

--EOF--

erlang bit syntax

已知erlang bit syntax语法:Bit Syntax

1. 编写一个函数来反转某个二进制型的字节顺序。

1
2
reverse(Bin) ->
  iolist_to_binary(lists:reverse(binary_to_list(Bin))).

2. 编写一个term_to_packet(Term) -> Packet函数,通过调用term_to_binary(Term)来生成并返回一个二进制型,它内含长度为4个字节的包头N,后跟N个字节的数据。

1
2
3
4
term_to_packet(Term) ->
  Bin = term_to_binary(Term),
  BinLen = byte_size(Bin),
  <<BinLen:32, Bin:BinLen/binary>>.

3. 编写一个反转函数packet_to_term(Packet) -> Term, 使它成为前一个函数的逆向函数。

1
2
3
packet_to_term(Packet) ->
  <<_:32, Bin/binary>> = Packet,
  binary_to_term(Bin).

4. 编写一个函数来反转某个二进制型所包含的位。

1
2
3
4
5
6
7
bit_reverse(Bin) ->
  bit_reverse(Bin, <<>>).
bit_reverse(<<>>, Dbin) ->
  Dbin;
bit_reverse(Sbin, Dbin) ->
  <<H:1, T/bits>> = Sbin,
  bit_reverse(T, <<H:1, Dbin/bits>>).

在对位串进行模式匹配时,要注意最后一个片段(segment)的匹配格式,需要要加上binary(bytes)/bitstring(bits):

1
2
3
4
5
6
7
To match out the rest of a binary, specify a binary field without size:
foo(<<A:8,Rest/binary>>) ->
The size of the tail must be evenly divisible by 8.
 
To match out the rest of a bitstring, specify a field without size:
foo(<<A:8,Rest/bitstring>>) ->
There is no restriction on the number of bits in the tail.

--EOF--

erlang file-related modules execises

1. When an Erlang file X.erl is compiled, a file X.beam is created (if the compilation succeeded). Write a program that checks whether an Erlang module needs recompiling. Do this by comparing the last modified time stamps of the Erlang and beam files concerned.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-export([check/2]).
-include_lib("kernel/include/file.hrl").
 
-spec check(string(), string()) -> boolean() | error.
check(Dir, Mod) ->
  ErlTime = mtime(filename:join([Dir, Mod ++ ".erl"])),
  BeamTime = mtime(filename:join([Dir, Mod ++ ".beam"])),
  recompile(ErlTime, BeamTime).
 
mtime(File) ->
  case file:read_file_info(File) of
    {ok, Info} ->
      Info#file_info.mtime;
    _ -> error
  end.
 
recompile(error, _) ->
  error;
recompile(_, error) ->
  error;
recompile(ETime, BTime) when ETime >= BTime ->
  true;
recompile(_, _) ->
  false.

2. Write a program to compute the MD5 checksum of a small file, and use the BIF erlang:md5/1 to compute the MD5 checksum of the data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-export([md5sum/1]).
 
-spec md5sum(string()) -> string() | error.
md5sum(File) ->
  case file:read_file(File) of
    {ok, Context} ->
      L = binary_to_list(erlang:md5(Context)),
      lists:flatten(lists:map(fun(X) -> [hex(X div 16), hex(X rem 16)] end, L));
    _ -> error
  end.
hex(N) when N < 10 ->
  $0 + N;
hex(N) ->
  $a + N - 10.

3. Repeat the previous exercise for a large file (say a few hundred megabytes). This time read the file in small chunks, using erlang:md5_init, erlang:md5_update, and erlang:md5_final to compute the MD5 sum of the file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-export([md5sum/1]).
 
-spec md5sum1(string()) -> string() | error.
md5sum1(File) ->
  {ok, FD} = file:open(File, [read, binary, raw]),
  Context = erlang:md5_init(),
  read(FD, 0, Context).
 
read(FD, Loc, Context) ->
  case file:pread(FD, Loc, 1000) of
    {ok, Bin} ->
      NewContext = erlang:md5_update(Context, Bin),
      read(FD, Loc + 1000, NewContext);
    eof ->
      L = binary_to_list(erlang:md5_final(Context)),
      file:close(FD),
      lists:flatten(lists:map(fun(X) -> [hex(X div 16), hex(X rem 16)] end, L));
    _ ->
      error
  end.

4. Write a cache mechanism that computes the MD5 checksum of a file and remembers it in a cache, together with the last modified time of the file. When we want the MD5 sum of a file, check the cache to see whether the value has been computed, and recompute it if the last modified time of the file has been changed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
-export([get_md5/1, init_cache/1]).
-include_lib("kernel/include/file.hrl").
-record(finfo, {filename, md5, mtime}).
 
-spec get_md5(string()) -> string() | error.
get_md5(File) ->
  case get(File) of
    undefined ->
      cache_one_file(File);
    #finfo{mtime = Mtime, md5 = Md5} ->
      NewMtime = mtime(File),
      case Mtime < NewMtime of
        true -> cache_one_file(File);
        false -> Md5
      end;
    _ -> error
  end.
 
-spec init_cache(string()) -> done.
init_cache(Dir) ->
  Flist = files(Dir, "*.erl"),
  cache_all(Flist).
 
cache_all([]) ->
  done;
cache_all([H | T]) ->
  cache_one_file(H),
  cache_all(T).
 
cache_one_file(File) ->
  Md5 = three(File),
  FRecord = #finfo{filename = File, md5 = Md5, mtime = get_mtime(File)},
  put(File, FRecord),
  Md5.
 
get_mtime(File) ->
  case file:read_file_info(File) of
    {ok, Info} ->
      Info#file_info.mtime;
    _ -> error
  end.
 
%%files search utils
files(Dir, Reg) ->
  Fun = fun(File, Acc) -> [File | Acc] end,
  lists:reverse(files(Dir, xmerl_regexp:sh_to_awk(Reg), true, Fun, [])).
 
files(Dir, Reg, Recursive, Fun, Acc) ->
  case file:list_dir(Dir) of
    {ok, Files} -> find_files(Files, Dir, Reg, Recursive, Fun, Acc);
    {error, _} -> Acc
  end.
 
find_files([File | T], Dir, Reg, Recursive, Fun, Acc0) ->
  Fullname = filename:join([Dir, File]),
  case file_type(Fullname) of
    regular ->
      case re:run(Fullname, Reg,[{capture, none}]) of
         match ->
           Acc = Fun(Fullname, Acc0),
           find_files(T, Dir, Reg, Recursive, Fun, Acc);
        nomatch ->
          find_files(T, Dir, Reg, Recursive, Fun, Acc0)
      end;
    directory ->
      case Recursive of
        true ->
          Acc =  files(Fullname, Reg, Recursive, Fun, Acc0),
          find_files(T, Dir, Reg, Recursive, Fun, Acc);
        false ->
          find_files(T, Dir, Reg, Recursive, Fun, Acc0)
      end;
    error ->
      find_files(T, Dir, Reg, Recursive, Fun, Acc0)
  end;
 
find_files([], _,_,_,_, Acc) ->
  Acc.
 
file_type(Filename) ->
  case file:read_file_info(Filename) of
    {ok, Facts} ->
      case Facts#file_info.type of
        regular -> regular;
        directory -> directory;
        _ -> error
      end;
    _ -> error
  end.

Exercises from 『Programming Erlang (2Edtion)』.

--EOF--