月度归档:2014年02月

AMQP 0-9-1协议笔记

0x00: 概览

AMQP(Version:0-9-1)协议定义了客户端和消息中间件(broker)的交互过程。它的目标是提供一个标准消息中间件技术,降低企业与系统的集成成本,向大众提供工业级的集成服务。

AMQP由AMQ Model和网络层协议两部分组成,具体来讲,前者由一系列服务器内的路由和消息存储以及一些规则组成,后者则是定义了协议的数据格式。AMQP协议是一个二进制协议,它的主要特征是:多Channel,协商式,异步,安全,可移植,高效。可分为功能层和传输层两个层次:

+------------------Functional Layer----------------+ 
|   Basic Transactions Exchanges Message queues    |
+--------------------------------------------------+
+------------------Transport Layer-----------------+
|      Framing  Content   Data representation      |
|     Error handling  Heart-beating   Channels     |
+--------------------------------------------------+

其中,功能层定义了一个命令集合,如图中划分的Basic、Transaction、Exchange、Queue等。传输层定义了实现上述命令所需的多Channel、心跳、帧格式、数据表示、错误处理等规范。

0x01: 基本架构

AMQ Model主要由exchange,queue, binding三大组件构成,三大组件的作用为:
exchange:从生产者程序接收消息,并路由到queue。exchange是一个匹配和路由引擎,只转发消息,不存储消息。
queue: 存储消息至内存或者磁盘,直到消息被消费者程序取走。
binding: 定义了exchange和queue的绑定关系,提供消息路由的标准。

一条AMQP消息由消息头加消息体构成,消息头是一系列属性的集合。生产者生产消息后,为消息加上routekey,发送到服务器端。exchange根据routekey将消息分发到queue,如果找不到相应queue,比如queue不存在,或者routekey未匹配到任何一个queue, 则exchange会将消息丢弃或者将其返回给生产者。一条消息可以存在于多个queue中,服务器端可以有多种实现方案,可以通过拷贝消息,可以通过引用计数的方式,等等。此时,对于每个queue来说,这条消息是唯一的。

生产者可以指定queue将消息存储在内存还是磁盘,还可以指定当queue没有消费者时,消息是否返回给生产者。消息在服务器端可以被持久化,可以设置优先级,同一队列中,高优先级消息会比低优先级的消息更早被投递。

服务器端不能删除消息头部信息,不能修改消息体内容,但是可以在消息头部添加信息。

vhost是服务器端一系列exchange、queue及相关概念的集合。每个连接都必须所属一个唯一的vhost,因此这个连接内的所有Channel都工作在同一个vhost里。客户端在连接握手的时候选择vhost,具体是在Connection.Open方法中指定,而Connection.Open方法是在认证之后,这意味着服务器端的认证是针对全局的,所有vhost共享。AMQP没有定义如何创建和配置vhost,这部分由实现方自己定义。

生产者不会将消息直接发送到queue, 否则会破坏AMQ model。但是AMQ Model会提供一个默认的exchange,当生产者不指定exchange时,消息都会发送到这个默认的exchange里,同时还提供一个默认的绑定关系,即当消息的routekey和queue名称一致时,exchange会将消息路由到这个queue里,这种模式将AMQP退化为普通的队列操作,用户无需理解exchange和binding的工作流程。

AMQP用“declare”声明exchange,它表示“如果不存在,则创建”,这个操作是幂等的。也提供删除exchange的操作,只是一般来说应用程序用不到。

exchange的类型有以下5种:
1. direct: bindkey为K,消息routekey为R,当R=K时转发。
2. fanout: 群发。
3. topic: bindkey为K,消息routekey为R,当R匹配K时转发。bindkey和routekey格式为由点号(.)分隔的一个或多个word,并且支持*和#通配符。每个word由A-Z,a-z,0-9之间的字符组成,*通配一个word,#通配0个或多个word。
4. header: binding为一个属性集合加x-match参数,当x-match为all时,消息头部必须匹配所有binding属性集合才转发;当x-match为any时,消息头部任意匹配binding属性集合中的一个就可以转发。关于属性匹配的规则是:
1) binding只包含name,不包含value时, 消息头部只需包含相同name,不管有没有value。
2) binding包含name-value,消息头部也要包含name-value,并且两者的name和value要分别一致。
5. system: 转发所有以"amq."开头的routingkey的消息给系统服务。

queue的声明有一些关键3个关键属性:
1. name: 队列名称,如果不填,则服务器会默认给一个。
2. exclusive: 标记当前队列是否是某条连接独享,当连接断开时,队列删除。
3. durable:标记此队列是否需要持久化。如果设置为false,当服务器重启时,队列删除。

queue是一个FIFO的缓冲区,但是在某些情况下,比如有多个消费者,消息设置优先级或者投递策略优化等因素都会破坏队列的FIFO特性(顺序性),唯一能保证队列FIFO特性的方法就是保证只有一个消费者,并且设置消息的优先级相同。queue中的一条消息只会投递给一个消费者,除非这个消费者投递失败(连接断开)或者消费者显式拒绝(Basic.Reject)。对于queue来说,一个Channel就是一个消费者。

Channel.Flow方法可以用于控制服务器端或者客户端的流量,它是能控制生产者发送过多的消息。pre-fetch机制可以控制消费者在ack消息之前服务器最多向其投递的消息数量,以保证服务质量(Qos),对消费者来说,pre-fetch机制比Channel.Flow方法更加优雅。

消息ack有两种方式:
1. 自动:服务器在把消息投递到客户端后(Basic.Deliver或Basic.Get-Ok)后直接将消息删除。
2. 显式:消费者显式发送消息ack方法(单条或者批量),服务器端收到ack方法后删除消息。

AMQP的错误处理有两个层次:
1. Channel异常:一个具体操作级别的错误(operational error)会抛出Channel异常,它不会影响到客户端的整体运行,只需关闭Channel即可,比如队列不存在、未获得操作权限等。
2. Connection异常:一个结构或者协议级别的错误(structural error)会抛出Connection异常,它会影响程序继续往下执行,需关闭整个连接,比如参数配置错误、方法乱序等。

当出现异常时,服务器端会给出3位数字的错误码和一个简单的错误描述文本,这和HTTP类似。

0x02: 命令

AMQP是一种wire-level协议。所谓的wire-level协议,我理解的是它类似API,但是抽象层次更低,API是以“类名.方法名”的方式去调用,而wire-level是直接以字节流的方式在网络层传输,类似HTTP,可以直接在TCP连接上发送"GET ..."字节流来实现。AMQP命令将接口与实现分离,接口是程序员友好的“类名+方法名”格式,而实现则是以帧的形式存在,AMQP协议定义了各个命令的帧格式。

AMQP的方法可以分为两类:
1. 同步请求-响应。通信一端发送一个请求命令(类名+方法名+参数),另一端返回一个响应命令(类名+方法名+响应值)。为简单起见,每个请求命令都对应一个唯一的一个响应命令。
2. 异步通知。通信一端发送一个请求命令,另一端无需返回响应。这种方法一般用在性能优先的场景,比如消息投递。

所有的AMQP命令都是上述的同步请求、同步响应和异步通知三种之一。

各个层次命令的对应关系表述如下(以Queue.Declare为例):

Queue.Declare
  queue=my.queue
  auto-delete=TRUE
  exclusive=FALSE

相应的帧格式为:

+--------+---------+----------+-----------+-----------+ 
|  Queue | Declare | my.queue |     1     |     0     |
+--------+---------+----------+-----------+-----------+ 
   class    method     name    auto-delete   exclusive

更高层次的API表述为:

queue_declare (session, "my.queue", TRUE, FALSE);

伪代码可表述为如下,它的含义是发出去一个同步请求方法后,一直等到与之匹配的响应方法为止,这期间如果收到一个异步方法,则会优先进行处理:

send request method to server 
repeat
  wait for response from server
  if response is an asynchronous method
    process method (usually, delivered or returned content) 
  else
    assert that method is a valid response for request
    exit repeat 
  end-if
end-repeat

AMQP命令可以分成6类,分别为:
1. Connection类。AMQP是面向连接的协议,连接可以多Channel复用。客户端和服务器端的时序为:
C:AMPQ0091 -> S:Start -> C:Start-OK -> S:Secure -> C:Secure-OK -> S:Tune -> C:Tune-OK -> C:Open -> S:Open-OK -> using connection -> Close -> Close-Ok
Close可由任意一端发起,由另一端响应。
2. Channel类。AMQP支持多Channel复用连接,多Channel可以将一条物理连接划分为多个逻辑连接,提升性能,并且实现“防火墙友好”。Channel之间互相独立,且可以同时处理不同的请求,所有Channel共享连接带宽。建议多线程语言环境下采用“channel-per-thread”编程模型。
3. Exchange类。
4. Queue类。
5. Basic类。封装了消息相关的方法。例如生产者发送消息(Basic.Publish)、开始/停止消费消息(Basic.Consume/Basic.Cancel)、服务器发送消息(Basic.Deliver、Basic.Return)、消息ack(Basic.Ack、Basic.Reject)、获取消息(Basic.Get)。
6. Transaction类。AMQP支持两种事务类型:
1)自动事务。服务器端每条生产者发送的消息和消费者发送的ack作为独立的事务,自动提交。
2)本地事务。服务器端缓存生产者发送的消息和消费者发送的ack,由客户端显式发送指令提交。
AMQP事务只有覆盖到生产者发送消息和消费者ack消息,并没有覆盖到服务器端向消费者投递消息,因此,客户端的rollback操作不会重新造成消息入队(requeue)或者投递消息(redeliver)。

0x03: 帧格式

TCP/IP是个流协议,没有内建机制为帧定界,常用的帧定界有三种方式:
1. 每条连接只发送一个帧。简单但是效率低下。
2. 每个帧结尾加特殊标记。简单但是解析效率低下。
3. 将帧的大小放置在每一帧头部。AMQP采用这种方式实现帧定界。

所有AMQP命令都能映射为帧。帧的格式如下:

0      1         3             7            size+7        size+8 
+------+---------+-------------+ +------------+ +-----------+ 
| type | channel |    size     | |   payload  | | frame-end | 
+------+---------+-------------+ +------------+ +-----------+
 octet    short      long size        octets         octet

所有帧都是由一个7字节长的帧头部(frame header),任意字节长度的payload,和一个字节长的帧尾(frame-end)构成。构成帧的数据类型包括位(bit),无符号整数(unsign integer),字符串(string)以及field table(类似map)。

AMQP有四种类型的帧:
type=1: method类型。
type=2: content header类型。
type=3: content body类型。
type=4: 心跳。
当任一端收到的type非上述4种之一,则可以判定协议非法,断开连接。

每个帧都携带一个Channel号,Channel号的可表示范围为0-65535,其中,0表示此帧应用于当前连接,1-65535表示此帧属于某个具体Channel。AMQP通过这种方式来实现多Channel连接复用。从逻辑上看,Channel与连接的关系如下:

    frames      frames      frames      frames
+-----------+-----------+-----------+-----------+ 
|  channel  |  channel  |  channel  |  channel  | 
+-----------+-----------+-----------+-----------+ 
|               socket connection               | 
+-----------------------------------------------+

帧头部的size字段表示payload的长度,这个长度不包含frame-end这个字节。

frame-end在AMQP帧中为固定值0xCE,它的作用是可进行简单的错误检测,说它“简单”是因为此处没有像数据链路层帧那样通过CRC来检测错误。任意一段在解析帧之前,需判断frame-end的有效性。

不同类型的帧有不同的payload:
1. method帧。method帧的payload格式如下:

 0          2           4 
 +----------+-----------+-------------- - - 
 | class-id | method-id | arguments... 
 +----------+-----------+-------------- - -
    short       short       ...

其中,class-id为类id,method-id为方法id。AMQP类和方法的具体定义可以通过查阅amqp-xml-doc0-9.pdf文档

2. content header帧。content帧是一些携带消息应用数据的帧,比如Basic.Publish或者Basic.Deliver方法后面必须紧跟着content帧。从宏观看来,此类帧的发送顺序为:

[method] [content header] [content body] [content body]...

method帧后面跟着一个content header帧,content header帧后面跟着0个或多个content body帧。AMQP之所以将数据放在content帧,而没有包含在method帧里,其设计目的是为了支持类似“zero copy”、sendfile()的功能,即,不用解开method帧取出content数据就能得到content内容。而将content的属性(封装在header里)和内容分离,也是为了方便接收端只需根据content header就能决定是否丢弃不需要的content body,提升效率。

content header帧的payload格式如下:

0          2        4           12               14 
+----------+--------+-----------+----------------+------------- - - 
| class-id | weight | body size | property flags | property list... 
+----------+--------+-----------+----------------+------------- - -
    short     short   long long        short         remainder...

其中,class-id表示此帧所属的method帧的class-id。weight值固定为0。body size是其后跟随的所有content body帧的总大小,当它为0时表示未携带任何数据(比如Basic.Publish了一个空字符串)。

3. content body帧。content body帧的payload整个就是一个二进制流,前跟frame header,后跟frame-end。

4. heartbeat帧。心跳帧目的是保证长时间没有数据流通的tcp连接不被关闭。心跳机制工作在AMQP的传输层即可,因此它独立成帧,没有对应着功能层的某个类和方法。心跳帧的间隔由双方在协议协商阶段商定。在具体实现方面,协议规定了:
1) 所有心跳帧的Channel号为0。
2) 当某一端不支持心跳机制但是收到心跳帧时,直接丢弃不能抛异常。
3) 客户端在收到服务器端的Connection.Tune方法后开始发送心跳帧(to server),在发送Connection.Open方法后开始监控心跳帧(from server)。服务器端在收到Connection.Tune-OK方法后开始发送和监控心跳帧(to/from client)。
4) 任何帧都可以当成心跳使用,因此心跳帧只需在连接上没有数据传递期间发送即可。任一端检测到2个心跳周期未收到数据(数据帧或者心跳帧)时,关闭连接。

Summarized from 『AMQP Protocol Specification

--EOF--

ets & dets exercise

1. Mod:module_info(exports) returns a list of all the exported functions in the module Mod. Use this function to find all the exported functions from the Erlang system libraries. Make a key-value lookup table where the key is a {Function,Arity} pair and the value is a module name. Store this data in ETS and DETS tables.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-module(exercise19).
-export([init/0, ets_lookup/1, dets_lookup/1]).
 
init() ->
  %% init ets table.
  case ets:info(?MODULE) of
    undefined ->
      ets:new(?MODULE, [bag, named_table]);
    _ -> void
  end,
  %% init dets table.
  case dets:open_file(?MODULE, [{file, ?MODULE}, {type, bag}]) of
    {ok, ?MODULE} -> void;
    {error, Reason} -> 
      io:format("dets table ~p init failed, Reason: ~p~n", [?MODULE, Reason])
  end,
  handle_all_loaded(),
  ok.
 
ets_lookup(FunRef) ->
  ets:lookup(?MODULE, FunRef).
 
dets_lookup(FunRef) ->
  dets:lookup(?MODULE, FunRef).
 
 
insert(KV) ->
  ets:insert(?MODULE, KV),
  dets:insert(?MODULE, KV).
 
handle_all_loaded() ->
  [handle_one_mod(Mod) || {Mod, _} <- code:all_loaded()].
 
handle_one_mod(Mod) ->
  Exports = Mod:module_info(exports),
  [insert({X, Mod}) || X <- Exports].
1
2
3
4
5
6
7
8
9
10
11
12
1> exercise19:init().
ok
2> exercise19:ets_lookup({map, 2}).
[{{map,2},orddict},
 {{map,2},lists},
 {{map,2},dict},
 {{map,2},gb_trees}]
3> exercise19:dets_lookup({map, 2}).
[{{map,2},orddict},
 {{map,2},lists},
 {{map,2},dict},
 {{map,2},gb_trees}]

Exercise from 『Programming Erlang (2Edtion)』.

--EOF--

『荒木经惟的天才写真术』

『荒木经惟的天才写真术』天才大师和普通大师的区别在于:普通大师从已有作品中选出较好的进行展出,而天才大师是在收到办影展邀请时再出门拍照,至于题材,无论路人、街景、肖像、色性、佛性、花草等等,左右逢源,最主要的,是给作品一个故事——一个普通大众需要解读才能理解的故事。荒木经惟就是这样的一个天才大师。

一起听听大师的教诲吧:
1. 为了蒙蔽事实,才需要所谓的摄影技巧。或许想拍出真正的好照片,根本就不该卖弄什么技术。现在的照相机,已具备任何我们所需的功能了,如此一来,玩弄所谓的技术,等于是为了把自己或真相隐蔽起来。技巧,是为了蒙蔽观者的眼睛。

2. 照片,是很不负责的。照片自身并不含有确实的陈述,它是随着观看的人而产生变化。无论是对被摄体或照片,想法都会随着拍摄时刻、观看时刻、摄影者或观看者而产生变化。同一张海边的照片,情侣在热恋时一起欣赏的感觉,与分手后看的感觉是完全不同的。

3. 我不主张换镜头,而是要自己后退或前进来接近被摄体。镜头是相机装置的一部分。拍照时若想到更换镜头这件事,其实你已经意识到相机。然而专注拍摄时是不会意识到相机的,因为你已经进入了相机。摄影,要运用自己的身体来拍,眼睛成为相机的镜头。总之,拍照必须自己移动位置,无论如何非移动不可。

4. 所谓挑选照片并公开(发表)这件事,其实也就是在选择不公开(不发表)的照片,所以交出去的照片究竟是不是记录了事实,也很难说,想必涵盖了某些不可公开的真相在内。所谓发表就是必须要说一点慌,因为真实和谎言夹杂在一起才敢发表。选出要发表的照片,也就是在挑选你不想发表的照片。一般读者只能看到少数被挑选出来的照片。荒木经惟的照片中,被社会大众所认同的,是依据那些已公开(发表)的照片来评断的。但是,被选为不公开的那些照片,同样是荒木经惟的作品,两者皆是荒木经惟。

5. 每个人身高不一,所以看世界的角度也不同。高个儿都在看轻这个社会,矮个儿则是很尊敬这个社会。我呢,尽量不由上而下,或由下而上地拍照,这可以说是我的“水平志向”吧,特别是由上往下拍,是绝对不可以的。但是向上也不行,那样会拍出愈来愈多希特勒类型的照片。俯拍或仰拍,会带出不同的摄影内涵,这个步骤就决定一切了。所以我拍照的时候,一定会让视线平行,以水平的角度按下快门,那样是最好的。这也是我地摄影基本宗旨。

6. 摄影本身包含了所谓的谎言和真实,混合了“虚”与“实”。但我不去思考,只是全心全意按快门。因为我本身不具主体性,拍照的主体性在被摄体身上。故事蕴藏在被摄体里。我并不想去了解什么是主观,什么是客观,或许是因为我根本不具备客观性。

7. 无论是绘画或者音乐,那种在兴奋下做出来的东西,不可能会成为什么好作品。必须有另一个自己冷静地从旁观望,一味地陶醉其中是不行的。不过,陶醉其中才能集中精神。所谓的集中,便是一种陶醉。所以,要有另一个自己以客观地态度来分析陶醉中的自己,这,才是摄影的方法。

--EOF--