月度归档:2014年04月

erlang file-related modules execises

1. When an Erlang file X.erl is compiled, a file X.beam is created (if the compilation succeeded). Write a program that checks whether an Erlang module needs recompiling. Do this by comparing the last modified time stamps of the Erlang and beam files concerned.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-export([check/2]).
-include_lib("kernel/include/file.hrl").
 
-spec check(string(), string()) -> boolean() | error.
check(Dir, Mod) ->
  ErlTime = mtime(filename:join([Dir, Mod ++ ".erl"])),
  BeamTime = mtime(filename:join([Dir, Mod ++ ".beam"])),
  recompile(ErlTime, BeamTime).
 
mtime(File) ->
  case file:read_file_info(File) of
    {ok, Info} ->
      Info#file_info.mtime;
    _ -> error
  end.
 
recompile(error, _) ->
  error;
recompile(_, error) ->
  error;
recompile(ETime, BTime) when ETime >= BTime ->
  true;
recompile(_, _) ->
  false.

2. Write a program to compute the MD5 checksum of a small file, and use the BIF erlang:md5/1 to compute the MD5 checksum of the data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-export([md5sum/1]).
 
-spec md5sum(string()) -> string() | error.
md5sum(File) ->
  case file:read_file(File) of
    {ok, Context} ->
      L = binary_to_list(erlang:md5(Context)),
      lists:flatten(lists:map(fun(X) -> [hex(X div 16), hex(X rem 16)] end, L));
    _ -> error
  end.
hex(N) when N < 10 ->
  $0 + N;
hex(N) ->
  $a + N - 10.

3. Repeat the previous exercise for a large file (say a few hundred megabytes). This time read the file in small chunks, using erlang:md5_init, erlang:md5_update, and erlang:md5_final to compute the MD5 sum of the file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-export([md5sum/1]).
 
-spec md5sum1(string()) -> string() | error.
md5sum1(File) ->
  {ok, FD} = file:open(File, [read, binary, raw]),
  Context = erlang:md5_init(),
  read(FD, 0, Context).
 
read(FD, Loc, Context) ->
  case file:pread(FD, Loc, 1000) of
    {ok, Bin} ->
      NewContext = erlang:md5_update(Context, Bin),
      read(FD, Loc + 1000, NewContext);
    eof ->
      L = binary_to_list(erlang:md5_final(Context)),
      file:close(FD),
      lists:flatten(lists:map(fun(X) -> [hex(X div 16), hex(X rem 16)] end, L));
    _ ->
      error
  end.

4. Write a cache mechanism that computes the MD5 checksum of a file and remembers it in a cache, together with the last modified time of the file. When we want the MD5 sum of a file, check the cache to see whether the value has been computed, and recompute it if the last modified time of the file has been changed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
-export([get_md5/1, init_cache/1]).
-include_lib("kernel/include/file.hrl").
-record(finfo, {filename, md5, mtime}).
 
-spec get_md5(string()) -> string() | error.
get_md5(File) ->
  case get(File) of
    undefined ->
      cache_one_file(File);
    #finfo{mtime = Mtime, md5 = Md5} ->
      NewMtime = mtime(File),
      case Mtime < NewMtime of
        true -> cache_one_file(File);
        false -> Md5
      end;
    _ -> error
  end.
 
-spec init_cache(string()) -> done.
init_cache(Dir) ->
  Flist = files(Dir, "*.erl"),
  cache_all(Flist).
 
cache_all([]) ->
  done;
cache_all([H | T]) ->
  cache_one_file(H),
  cache_all(T).
 
cache_one_file(File) ->
  Md5 = three(File),
  FRecord = #finfo{filename = File, md5 = Md5, mtime = get_mtime(File)},
  put(File, FRecord),
  Md5.
 
get_mtime(File) ->
  case file:read_file_info(File) of
    {ok, Info} ->
      Info#file_info.mtime;
    _ -> error
  end.
 
%%files search utils
files(Dir, Reg) ->
  Fun = fun(File, Acc) -> [File | Acc] end,
  lists:reverse(files(Dir, xmerl_regexp:sh_to_awk(Reg), true, Fun, [])).
 
files(Dir, Reg, Recursive, Fun, Acc) ->
  case file:list_dir(Dir) of
    {ok, Files} -> find_files(Files, Dir, Reg, Recursive, Fun, Acc);
    {error, _} -> Acc
  end.
 
find_files([File | T], Dir, Reg, Recursive, Fun, Acc0) ->
  Fullname = filename:join([Dir, File]),
  case file_type(Fullname) of
    regular ->
      case re:run(Fullname, Reg,[{capture, none}]) of
         match ->
           Acc = Fun(Fullname, Acc0),
           find_files(T, Dir, Reg, Recursive, Fun, Acc);
        nomatch ->
          find_files(T, Dir, Reg, Recursive, Fun, Acc0)
      end;
    directory ->
      case Recursive of
        true ->
          Acc =  files(Fullname, Reg, Recursive, Fun, Acc0),
          find_files(T, Dir, Reg, Recursive, Fun, Acc);
        false ->
          find_files(T, Dir, Reg, Recursive, Fun, Acc0)
      end;
    error ->
      find_files(T, Dir, Reg, Recursive, Fun, Acc0)
  end;
 
find_files([], _,_,_,_, Acc) ->
  Acc.
 
file_type(Filename) ->
  case file:read_file_info(Filename) of
    {ok, Facts} ->
      case Facts#file_info.type of
        regular -> regular;
        directory -> directory;
        _ -> error
      end;
    _ -> error
  end.

Exercises from 『Programming Erlang (2Edtion)』.

--EOF--

『TCP/IP Illustrated, Vol. 1』 - 链路层

一、 链路层作用:

1. 为IP模块发送和接收IP数据报
2. 为ARP模块发送ARP请求和接收ARP应答
3. 为RARP模块发送RARP请求和接收RARP应答

二、链路层协议:

TCP/IP协议族支持多种链路层协议,取决于网络所使用的硬件,如以太网(Ethernet V2、802.3等)、令牌环网、FDDI、RS-232串行线路(SLIP、PPP协议等)等。

三、以太网帧格式

以太网帧是链路层最常见协议。

两个标准:
1. Ethernet V2 (RFC 894): 以太网帧格式
2. 802.3 (RFC 1042): IEEE 802.2/802.3格式

Ethernet V2是事实标准,因为第一个大规模使用TCP/IP的bsd unix系统出现在RFC 894和RFC 1042之间。

两种帧格式如下:
tcpip-frame

IEEE 802.2/802.3(RFC 1042)和Ethernet(RFC 894)帧格式

区别: 根据第13-14字节区分两种帧格式,IEEE 802用两个字节表示长度,Ethernet V2用来表示类型。所以,当这两个字节的值大于1500时,表示的是Ethernet V2帧格式,否则是IEEE 802帧格式。

帧的长短有限制:有效帧长为64-1518字节(实际上,每一帧还有8个前导字节,用于帧同步和定界,不过已经在物理层去掉了,所以在链路层看不到这8个字节)。设置帧的最大传输单元(MTU)为了避免某主机长时间占用信道,上层数据数过大时需要分片。设置最短帧长是为了便于冲突检测(CSMA/CD),上层数据过小时需要进行补齐(Padding)。

四、环回接口(Loopback):

作用:允许运行在同一台主机的客户端程序和服务器程序通过TCP/IP通信。

loopback

环回接口处理IP数据报的过程

关键点:
1. 传给环回地址(一般为127.0.0.1)的任何数据均作为IP输入。
2. 传给广播/多播地址的数据报需复制一份传给环回接口,然后送到以太网上。因为广播/多播的定义中包含主机本身。
3. 任何传给主机IP地址的数据均送到环回接口。

上述关键点说明,一个发送给主机本身的包不会在以太网上出现。发送给环回地址(127.0.0.1)或其他本主机地址的包,会走完TCP/IP协议栈,但是不会走到物理层,因此可以用来验证链路层及上层的通信情况。

一些查看链路层协议相关参数的命令:
1. netstat -i
2. ip link
3. ifconfig -a

来自 『TCP/IP Illustrated, Vol. 1: The Protocols』。

--EOF--

WebSocket via nginx

WebSocket的握手流程[1]如下:
浏览器请求:

1
2
3
4
5
6
7
8
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com

服务器响应:

1
2
3
4
5
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat

细节不展开了,只需关注浏览器和服务器之间如何通过Upgrade和Connection header进行协商,将一个HTTP连接升级为WebSocket。

nginx 1.3版本以后开始支持WebSocket的反向代理。因为WebSocket的握手主要是通过标准HTTP请求和响应来实现的,因此对于反向代理服务器,nginx在7层只要正确处理好WebSocket协议的握手流程,使得本来在tcp连接上跑HTTP协议的,现在用来跑WebSocket,只要握手完成剩下的交互就是tcp层面的事,跟HTTP协议没有关系了。注意,如果nginx版本小于1.3,对WebSocket进行反向代理时会出现“WebSocket is closed before the connection is established.”的警告。

nginx支持WebSocket需进行以下配置(官方文档):

1
2
3
4
5
6
location /chat/ {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

HTTP 1.1的长连接是WebSocket协商的基础,另外因为Upgrade和Connection header在HTTP协议中都是hop-by-hop header,因此在反向代理的时候需要把这两个header回填,发送到服务器端。从全局来看,就是客户端(浏览器)同nginx之间建立一条tcp长连接,nginx和后端服务器之间建立一条(或多条,视upstream的server节点个数)tcp长连接,这些长连接都是用来传递WebSocket数据帧。

以下是一个真实例子,使用场景来为:
Openstack组件中有个nova-novncproxy服务,支持用户通过浏览器以VNC的方式登陆虚拟机。现在有一个类似域名为https://console.ustack.com的Web管理平台,在VNC登陆虚拟机页面嵌入noVNC地址。这里的问题是,Web管理平台是https的,如果noVNC地址是http,那么高版本浏览器为了安全考虑都会屏蔽noVNC页面,理由是https页面内嵌了http页面;如果给noVNC页面也加个https支持,那就有点浪费了,给IP地址买张证书不划算,给域名买张证书不划算的同时还浪费一个域名。所以最好的方式就是在Web管理平台的前端nginx机器上为noVNC做个反向代理,用户看到的都是Web管理平台的域名,这样在交互上也显得统一。结合上文描述的nginx支持WebSocket的配置,用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#web管理平台
upstream web {
  server 127.0.0.1:8080;
  keepalive 80;
}
 
#noVNC
upstream vnc {
  server 111.26.23.10:6080;
  keepalive 80;
}
 
 
server {
  listen 443;
  server_name console.ustack.com;
  ssl on;
  ssl_certificate /etc/nginx/ssl/server.crt;
  ssl_certificate_key /etc/nginx/ssl/server.key;
 
  location / {
    proxy_pass http://web;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header REMOTE-HOST $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_http_version 1.1;
  }
 
  location /vnc {
    rewrite ^/vnc/(.*) /$1 break;
    proxy_pass http://vnc;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header REMOTE-HOST $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_http_version 1.1;
  }
 
  location /websockify {
    proxy_pass http://vnc;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header REMOTE-HOST $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "Upgrade";
    proxy_read_timeout 86400;
  }
}

其中,/websockify location就是用来反向代理WebSocket请求,proxy_read_timeout参数是为了避免长时间没有数据收发时连接被nginx关闭。

[1] WebSocket. http://en.wikipedia.org/wiki/WebSocket.

--EOF--

RabbitMQ能打开的最大连接数

RabbitMQ自带了显示能够接受的最大连接数,有2种比较直观的方式:
1. rabbitmqctl命令。

1
2
3
4
5
6
7
8
9
10
11
12
n$ rabbitmqctl status
Status of node 'rabbit@10-101-17-13' ...
[{pid,23658},
 ......
 {file_descriptors,
     [{total_limit,924},
      {total_used,10},
      {sockets_limit,829},
      {sockets_used,10}]},
 ......
]
...done.

2. rabbitmq_management WebUI插件。

本文关注当RabbitMQ可用连接数耗尽时客户端的影响以及如何增加最大连接数默认值。

RabbitMQ的socket连接数(socket descriptors)是文件描述符(file descriptors,fd)的一个子集。也就是说,RabbitMQ能同时打开的最大连接数和最大文件句柄数(文件系统,管道)都是受限于操作系统关于文件描述符数量的设置,两者是此消彼长的关系。初始时,可用socket描述符与可用fd数量的比率大概在0.8-0.9左右,这个值并不固定,当socket描述符有剩余时,RabbitMQ会使用尽量多的文件描述符用于磁盘文件读写。随着服务器建立越来越多的socket连接,文件句柄开始回收,数量减少。总之,RabbitMQ会优先将文件描述符用于建立socket连接,宁可牺牲频繁打开/关闭文件带来的磁盘操作性能损耗,这种取舍很容易理解,作为网络服务器,当然优先保障网络吞吐率了。因此,对于高并发连接数的多队列读写时,队列性能会稍微差那么一点,比如用RabbitMQ做RPC。

当服务器建立的socket连接已经达到限制(sockets_limit)时,服务器不再接受新连接。这里要区分清楚,RabbitMQ不再接收的是AMQP连接,而不是传输层的TCP连接,通过简单抓包分析即可清楚流程:

1
2
3
4
5
6
7
8
$ sudo tcpdump host 10.101.17.13 and port 5672
17:24:12.214186 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [S], seq 3319779561, win 65535, options [mss 1368,nop,wscale 5,nop,nop,TS val 1006381554 ecr 0,sackOK,eol], length 0
17:24:12.214231 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [S.], seq 1636058035, ack 3319779562, win 14480, options [mss 1460,sackOK,TS val 24529834 ecr 1006381554,nop,wscale 5], length 0
17:24:12.218795 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [.], ack 1, win 4110, options [nop,nop,TS val 1006381560 ecr 24529834], length 0
17:24:12.243184 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [P.], seq 1:9, ack 1, win 4110, options [nop,nop,TS val 1006381583 ecr 24529834], length 8
17:24:12.243201 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [.], ack 9, win 453, options [nop,nop,TS val 24529841 ecr 1006381583], length 0
17:24:22.247907 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [F.], seq 9, ack 1, win 4110, options [nop,nop,TS val 1006391550 ecr 24529841], length 0
17:24:22.284914 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [.], ack 10, win 453, options [nop,nop,TS val 24532352 ecr 1006391550], length 0

line 2-4是TCP握手包,成功建立TCP连接。line 5开始客户端向服务器端发送AMQP协议头字符串“AMQP0091”,共8个字节,开始AMQP握手。line 6是服务器回给客户端的ack包,但未发送AMQP connection.start方法,导致客户端一直等到超时(line 7-8),发送FIN包关闭TCP连接。至此,AMQP连接建立失败。

从客户端(Java SDK)来看上述这个过程,客户端通过ConnectionFactory实例的newConnection()方法创建一条AMQP连接。在网络层,它首先通过java.net.Socket与服务器建立一条TCP连接,发送协议协商字符串“AMQP0091”,然后启动MainLoop线程,通过封装的Frame实例来循环读取帧(readFrame()),注意readFrame()方法可能会有一个SocketTimeoutException的超时异常,这个超时时间是由socket实例setSoTimeout方法写入,默认是10s,由AMQConnection.HANDSHAKE_TIMEOUT常量指定。当超时发生在AMQP连接握手阶段时,就抛出一个SocketTimeoutException异常,发生在其他阶段(除心跳超时)时,什么都不做继续下一个循环:

1
2
3
4
Caused by: java.net.SocketTimeoutException: Timeout during Connection negotiation
 at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection...
 at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:59)
 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)

这里的socket读取超时很容易跟连接超时搞混。连接超时由ConnectionFactory实例的setConnectionTimeout()方法指定,对应着网络层Socket实例connect()方法中的timeout参数,指的是完成TCP三次握手的超时时间;而读取超时是从socket中读取字节流的等待时间,前文已经说过,由Socket实例的setSoTimeout()指定。这在各种网络库中应该很常见,比如HttpClient。

私以为这种情况下更好的设计是应该由RabbitMQ主动断开与客户端的TCP连接,减少客户端等待时间。

最后一个问题,如何增加RabbitMQ的能够同时打开的连接数。通过前文可知,最大并发连接数由此进程可打开的最大文件描述符数量(乘以一个比例系数)决定,因此只要增加单个进程可打开的文件描述符数量即可。有几个常规方法,按作用范围可以归纳为以下几类:

1. 进程级别。在启动脚本rabbitmq-server中加入ulimit -n 10240命令(假设将最大文件描述符数量设置为10240,下同),相当于在shell中执行,由此shell进程fork出来的进程都能继承这个配置。

2. 用户级别。修改/etc/security/limits.conf文件,添加以下配置,重新登录生效:

1
2
user    soft    nofile    10240
user    hard    nofile    10240

3. 系统级别。

1
# echo 10240 > /proc/sys/fs/file-max

上述设置只是针对proc文件系统,相当于修改了操作系统的运行时参数,重启后失效。要想永久生效,需要修改/etc/sysctl.conf文件,加入配置项fs.file-max=10240。

一个进程能打开的最大文件描述符数量受限于上述三个级别配置中的最小值。理论上,系统级别的配置数值必须要大于用户级别,用户级别的要大于进程级别的,只有这样配置才是安全的,否则进程容易因为打开文件数量问题受到来自操作系统的种种限制。操作系统为什么要限制可打开的文件描述符数量?为了系统安全。因为文件描述符本质上是一种内存中的数据结构,如果不加以限制,很容易被进程无意或恶意耗尽内存,比如fork bomb

--EOF--