Erlang顺序编程 [2]

1. 函数
函数参数的个数称为函数的目,例如sum/2表示函数名为sum,参数个数为2个。Erlang中判断两个函数是否相同的标准是其函数名和函数目是否严格一致,跟参数类型没半毛钱关系。因此,sum/1和sum/2除了恰好名称相同之外,再无其他联系。如下代码:

1
2
3
4
sum(L) -> sum(L, 0).
 
sum([], N) -> N;
sum([H|T], N) -> sum(T, N+H).

虽然看似有三个函数声明,但实际上只有两个函数sum/1和sum/2。sum/2由两个函数子句组成,子句之间用分号;隔开,如果子句sum([], N) -> N后面用了句号,那么编译器会抛function sum/2 already defined异常。

多个函数子句之间,Erlang根据子句出现的顺序对参数进行模式匹配,一旦匹配到,就会执行该子句后面表达式,所以子句的顺序非常重要。如果所有子句都匹配失败,则会抛出一个no function clause matching的运行时异常。

2. 匿名函数
匿名函数是没有名字的函数,用fun定义,形式为fun(参数) -> Expr end.。例如:

1
2
3
4
1> Double = fun(X) -> 2*X end.
#Fun<erl_eval.6.82930912>
2> Double(1).
2

当存在多个匿名函数子句时,fun函数形式为fun(参数1) -> Expr1; (参数2) -> Expr2 end.。例如:

1
2
3
4
5
6
1> Fun = fun({plus, X, Y}) -> X + Y; ({minus, X, Y}) -> X - Y end.
#Fun<erl_eval.6.82930912>
2> Fun({plus, 2, 1}).
3
3> Fun({minus, 2, 1}).
1

fun函数还可作为函数的参数,也可作为函数的结果。fun函数作为函数的参数的例子在lists模块的导出函数中就有用到:

1
2
3
4
5
6
7
 
1> Double = fun(X) -> 2*X end.
#Fun<erl_eval.6.82930912>
2> L = [1, 2, 3, 4].
[1,2,3,4]
3> lists:map(Double, L).
[2,4,6,8]

当fun函数作为某个函数的返回结果时,那个函数就相当于一个用来生成函数的函数。例如:

1
2
3
4
5
6
7
8
9
10
1> Fruit = [apple, pear, orange].
[apple,pear,orange]
2> MakeTest = fun(L) -> (fun(X) -> lists:member(X, L) end) end.
#Fun<erl_eval.6.82930912>
3> IsFruit = MakeTest(Fruit).
#Fun<erl_eval.6.82930912>
4> IsFruit(apple).
true
5> IsFruit(dog).
false

MakeTest本身是一个匿名函数,它接受列表L作为参数,并且生成一个匿名函数IsFruit,IsFruit匿名函数用来判断参数是否是列表L中的成员。

3. 列表解析
列表解析是生成列表的一种方式,记号为[ F(X) || X <- L ],它表示由F(X)组成一个列表,其中X取值于列表L。例如:

1
2
1> [ 2 * X || X <-  [1, 2, 3, 4] ].
[2,4,6,8]

显然这种方式比用list:map方式实现相同的功能更加简单:

1
2
1> lists:map(fun(X) -> 2 * X end,  [1, 2, 3, 4]).
[2,4,6,8]

列表解析还有以下更为常健的形式:

1
[ X || Qualifier1, Qualifier2,]

其中,X是任意一个表达式,每个限定词Qualifier可以是一个生成器或者过滤器,用来限定或者过滤出符合条件的列表值,其中生成器部分必须,并且生成器要在过滤器之前,例如:

1
2
1> [X || X <- [-1, 0, 1], X > 0].
[1]

列表解析可以用来解决一些经典问题,见如下两个例子:
1) 毕达哥拉斯三元组:给定整数N,求出符合条件的A2 + B2 = C2,A + B + C < N的所有A,B,C集合。

1
2
3
4
5
6
7
8
pythag(N) ->
     [{A,B,C} || 
          A<-lists:seq(1, N), 
          B<-lists:seq(1, N), 
          C<-lists:seq(1, N), 
          A*A + B*B =:= C*C, 
          A + B + C < N
     ].

2) 变位词:求出一个字符串的全排列。

1
2
3
perms([]) -> [[]];
perms(L) ->
     [[H|T] || H<-L, T<-perms(L -- [H])].

4. 异常
遇到系统内部错误时,可以有三种方式显示抛出异常:exit(Reason), throw(Reason), erlang:error(Reason)。用try…catch...语句捕捉异常的语法为:

1
2
3
4
5
6
7
8
9
10
11
try FuncOrExpressionSequence of 
     Pattern1 [ when Guard1 ] -> Expression1;
     Pattern2 [ when Guard2 ] -> Expression2;catch
     ExceptionType: ExPattern1 [ when ExGuard1 ] -> ExExpression1;
     ExceptionType: ExPattern2 [ when ExGuard2 ] -> ExExpression2;after
     AfterExpressions
end

try…catch…流程:对FuncOrExpressionSequence求值,如果不产生异常,则进入正常的PatternX分支,返回ExpressionX。如果产生异常,则进入ExceptionType: ExPatternX分支,其中ExceptionType为以下三个值之一:throw, exit和error,若为标明,默认为throw。after区块可选,如果有after区块,则无论FuncOrExpressionSequence求值过程中有没有异常抛出,AfterExpressions都会执行,但是表达式的值会被舍去。

以下为try…catch...的示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-module(test).
-compile(export_all).
 
generate_exception(1) -> reason;
generate_exception(2) -> throw(reason);
generate_exception(3) -> exit(reason);
generate_exception(4) -> exit(other);
generate_exception(5) -> {'EXIT', reason};
generate_exception(6) -> erlang:error(reason).
 
demo() ->
     [catcher(X) || X <- lists:seq(1, 6)].
 
 
catcher(N) ->
     try generate_exception(N) of
          Val -> {N, normal, Val}
     catch
          throw: X -> {N, caught, thrown, X};
          exit: reason -> {N, caught, exited, reason};
          exit: _ -> {N, caught, exited, other};
          error: X-> {N, caught, error, X}
     end.

运行结果如下:

1
2
3
4
5
6
7
1> test:demo().
[{1,normal,reason},
 {2,caught,thrown,reason},
 {3,caught,exited,reason},
 {4,caught,exited,other},
 {5,normal,{'EXIT',reason}},
 {6,caught,error,reason}]

5. 模式中使用匹配操作符
例如:

1
2
3
4
5
6
7
8
1> Func1 = fun({tag, X} = Z) -> {tag, X, Z} end.
#Fun<erl_eval.6.82930912>
2> Func1({tag, 1}).
{tag,1,{tag,1}}
3> Func2 = fun(Z = {tag, X}) -> {tag, X, Z} end.
#Fun<erl_eval.6.82930912>
4> Func2({tag, 2}).
{tag,2,{tag,2}}

为了重复进行模式匹配,可以将某个模式第一次出现时赋给一个临时变量,之后就可以用该临时变量替换那个模式。示例中将{tag, X}与临时变量Z绑定,更加高效(以该元组作为参数调用其他函数时避免创建新元组)同时更少出错,Z与{tag, X}绑定时,两者出现的左右顺序无关。

6. 基本类型
Erlang的基本类型之间可以进行比较操作,它们之间的优先级为:
number < atom < reference < fun < port < pid < tuple < list < binary 意思是所有数值类型都比原子类型小,表达式1 < a返回true;所有元组比列表小,表达式{1} < [1]返回true。 Erlang数值类型分为整形和浮点型。整形变量可代表的数据长度仅受限于可用的内存,它有两种表示方法:传统语法和K进制语法,后者的表示方法为K#Digits,K为进制,2<=K<=36,大于十进制时,Digits可用[a-zA-Z]表示数值10-35。例如2#10表示2,11#A表示10,36#Z表示35,36#10表示36。浮点数内部以IEEE 754的64bit格式表示,因此其可表示的范围为-10323~10308

整数和浮点数的比较有以下一些原则:
1) 如果比较双方一个是整数,一个是浮点数,则整数在比较之前会转换为浮点数。
2) 如果比较双方都是整数或是浮点数,则以原来类型进行比较。

7. 布尔表达式
Erlang的布尔表达式有两套运算符:orelse/andalso和or/and。前者称为短路布尔表达式,当表达式的左边能决定最终运算结果时,右边表达式不进行运算。例如Expr1 orelse Expr2,如果Expr1为true,那么Expr2不会进行求值。而or/and表达式中,无论表达式的值是否可以从左边参数推断出来,右边表达式必须进行求值。

摘自Joe Armstrong - 『Programming Erlang』。

--EOF--

如何检测RabbitMQ工作状态

RabbitMQ的management插件提供了一个基于HTTP的接口GET /api/aliveness-test/%2F来检测RabbitMQ工作状态,aliveness-test是声明在%2F(默认vhost '/')下的一个测试队列,如果返回响应码200 OK,响应体{"status":"ok"},就可以认为当前RabbitMQ节点处在正常工作状态。

大部分情况下,这个接口屡试不爽。然而工作中碰到过一个场景,此接口出现过非预期的结果,具体原因尚不明确,只有初步结论:通过HTTP接口测试RabbitMQ工作状态不一定可靠。

出现非预期结果的场景如下:
有两台通过Openstack nova api创建的KVM虚拟机,在虚拟机里分别部署RabbitMQ(称为rabbit1@192.168.1.100, rabbit2@192.168.1.101),rabbit1@192.168.1.100和 rabbit2@192.168.1.101通过join_cluster命令建好集群模式。另外,在两个虚拟机内分别运行心跳程序,心跳程序每5s运行一次,它先调用GET /api/aliveness-test/%2F获取RabbitMQ工作状态,如果运行正常,则向管理服务器发送一个心跳包。

假如这时候通过nova的POST v2/{tenant_id}/servers/action接口关闭或者重启虚拟机192.168.1.100:

1
2
3
4
5
6
7
8
9
10
11
POST /${tenant_id}/servers/${server_id}/action
 
HOST: nova.server.xxx.org
Content-Type: application/json
Accept: applicaton/json
 
{
    "reboot" : {
        "type" : "HARD"
    }
}
1
2
3
4
5
6
7
8
9
POST /${tenant_id}/servers/${server_id}/action
 
HOST: nova.server.xxx.org
Content-Type: application/json
Accept: applicaton/json
 
{
    "os-stop": null
}

理论上,此时另一个RabbitMQ节点的心跳程序不应受到影响才对,但实际上,调用http://192.168.1.101:15672/api/aliveness-test/%2F会发现连接被阻塞住,无法返回。直到较长时间后接口才会返回。在这个HTTP接口被阻塞的过程中,RabbitMQ端口5672、HTTP API端口15672均正常开启,服务正常,但是因为心跳检测异常,管理服务器早已判定这两个节点离线,这就造成了误报。

到目前为止,这种诡异的情况只出现在这么一个场景下。其他情况比如ssh到其中一台虚拟机,通过reboot(8), shutdown(8)重启或者关闭,另一个RabbitMQ节点的HTTP接口不会被阻塞。此外,我了解到通过nova api关闭虚拟机,就相当于按了这台机器的电源键(强制关机、断电),于是尝试过两台物理机组成一个RabbitMQ节点集群,环境分别为OS X 10.8.2 + Erlang R15B03 + RabbitMQ 3.1.3和Win7 + Erlang R15B03-1 + RabbitMQ 3.1.5,在两个节点正常运行的情况下,强制关闭其中一台机器,也未出现上述问题。

既然通过HTTP的方式检测服务状态来发心跳的方法不靠谱,于是想到了通过检测TCP端口号的方式,以Python为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import socket
 
def check_aliveness(ip, port):
    sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sk.settimeout(1)
    try:
        sk.connect((ip,port))
        print 'service is OK!'
        return True
    except Exception:
        print 'service is NOT OK!'
        return False
    finally:
        sk.close()
    return False
 
check_aliveness('127.0.0.1', 15672)

5672端口和15672端口,检测哪个端口更合理些?RabbitMQ默认会通过日志($RABBITMQ_HOME/var/log/rabbitmq/*.log)记录5672端口的客户端TCP连接/释放信息,如果长期运行,正常的AMQP客户端信息会被心跳连接信息淹没。所以,还是检测15672端口合理一些。

更简单的可以通过Bash脚本直接检查5672端口有没开启:

1
2
3
$ port=5672 
$ netstat -an | grep LISTEN | grep $port
tcp46      0      0  *.5672                 *.*                    LISTEN

--EOF--

Erlang顺序编程 [1]

1. 变量不变性
Erlang中的变量分为绑定变量和自由变量。绑定变量是指经过一次赋值操作的变量,此后,该变量不能被再次改变,也就是说,一个变量在其可见域内只能赋值一次。除了绑定变量之外的变量都是自由变量,经过一次绑定后成为绑定变量。不要将‘=’理解为赋值操作,而是将其理解为模式匹配能更好理解变量的单次赋值特性。例如:

1
2
3
4
1> A=1.
1
2> A=2.
** exception error: no match of right hand side value 2

赋值之前,A是自由变量,通过模式匹配符=将A与1进行绑定,此后A的值不能再变,所以A=2模式匹配失败。变量绑定的本质是模式匹配,而其实现方式是:一个绑定变量就是一个指针,这个指针指向存放那个值的存储区,那个值是无法改变的。假如将某个变量绑定到一个空列表[],再往[]中append元素,此时,变量A还是指向[],要引用到添加新元素后的列表,必须将新列表绑定到新的变量。这点与Java等语言的列表引用非常不同。

Shell中可以通过b()查看所有当前已绑定的变量,f()命令释放所有绑定过的变量,f(X)释放变量X的绑定。

2. 浮点数运算
除号‘/’操作符永远返回浮点数,即使进行两个整数的除法运算,结果也会自动转换为浮点数。div和rem则只适用于整数的除和取余。例如:

1
2
3
4
5
6
7
8
9
10
11
12
1> 5 / 2.
2.5
2> 4 / 2.
2.0
3> 4.0 div 2.
** exception error: an error occurred when evaluating an arithmetic expression
     in operator  div/2
        called as 4.0 div 2
4> 5 div 2.
2
5> 5 rem 2.
1

3. 原子
原子(atom)是一串以小写字母开头,后跟数字字母、下划线或@的字符,一个原子的值就是原子自身。用单引号引起来的字符也是原子,通过这种形式,原子可以用大写字母开头或者包含其他可打印字符,如'Var', 'an atom'等。

4. 列表
列表的第一个元素称为列表的头(Head),剩下的称为尾(Tail),列表的Head可以是任何东西,但是列表的Tail通常还是一个列表。访问列表的头是非常高效的操作,实际上所有的列表处理函数都是从提取表头开始的,关于表头操作和表尾操作的性能差异可参考『Erlang列表操作性能分析』

当用[...|T]的方式来构造一个列表时,应该尽量保证T是一个列表。如果T是一个列表,那么新的列表就是正规形式的;反之,称为非正规列表,很多Erlang库都假定列表是正规的,可能不能正确处理非正规列表。两种列表形式如下:

1
2
3
4
1> [a|b]. %非正规列表
[a|b]
2> [a|[b]]. %正规列表
[a,b]

5. 字符串
Erlang中的字符串是用双引号(")括起来的一串字符,它的本质上是一个整数列表,列表中的每一个元素都是相应字符的整数值。只有列表中所有整数都是可打印字符(ISO 8859-1编码,也称Latin-1编码)时,Shell才把它当做字符串来打印。例如:

1
2
3
4
1> A = "erlang".
"erlang"
2> [1|A].
[1,101,114,108,97,110,103]

在列表头部加个不可打印字符的整数值,Erlang字符串立刻暴露出了它的本质。

另外,通过$符号可以显示一个可打印字符的整数值。例如:

1
2
3
4
5
6
1> $a.
97
2> $ .
32
3>.
234

摘自Joe Armstrong - 『Programming Erlang』。

--EOF--

RabbitMQ消息过期时间

RabbitMQ针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。

1. 设置队列属性。
通过队列属性设置消息TTL的方法是在queue.declare方法中加入x-message-ttl参数,单位为ms。
SDK设置如下:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

HTTP接口调用如下:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT 
-d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://localhost:15672/api/queues/{vhost}/{queuename}

如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分代替RabbitMQ 3.0.0以前支持的immediate参数,之所以说部分代替,是因为immediate参数在投递失败会有basic.return方法将消息体返回,详见『AMQP协议mandatory和immediate标志位区别』

2. 设置消息属性。
针对每条消息设置TTL的方法是在basic.publish方法中加入expiration的属性参数,单位为ms。
SDK设置如下:

1
2
3
4
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("myexchange", "routingkey", properties, messageBodyBytes);

HTTP接口调用如下:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST 
-d'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my 
body","payload_encoding":"string"}' 
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而第二种方法里,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。

--EOF--