标签归档:消息队列

一种分布式系统消息服务器设计

设计一个分布式系统,首先面临的就是如何解决服务间的通信问题,同步还是异步,是采用基于消息总线的事件驱动架构(EDA)还是分布式服务框架,这在很大程度上决定了系统的可扩展性。

消息服务器的可选择性很多,比如早些年的XMPP服务器,传统的JMS服务器,还有目前比较流行的AMQP消息服务器,简单的优缺点对比如下:

类型 优点 缺点
Openfire (XMPP) 1. 成熟,稳定。
2. 适合做聊天服务,
   在IM领域(Gtalk,网易POPO等)应用广泛。
1. 消息可靠性无保障。
2. 路由策略不够灵活。
3. 集群模式不完善。
4. 协议太重。
ActiveMQ (JMS) 1. 成熟,稳定。
2. 与Java应用契合度高。
1. 路由策略不够灵活。
2. 集群模式不稳定。
RabbitMQ (AMQP) 1. 成熟,稳定。
2. 路由策略灵活。
3. 消息可靠传输。
4. 集群方案成熟。
1. 配置项多,学习和运维成本高。

本文分享一种基于RabbitMQ的消息服务器设计方案:

MQ Server

如上图所示,黄色虚线表示的是分布式系统的一个子服务,假设分别为WEB、REPO和CTRL服务。P表示生产者,C表示消费者。生产者把消息发送到一个topic模式的Exchange (X),根据route key和binding key的运算结果,将消息路由到相应队列,再由队列投递给具体的消费者。

我们将请求从服务调用方的角度分成两类:同步和异步。同步(rpc.call)是指需要应答的请求,比如获取依赖服务的状态。异步(rpc.cast)是指无需应答的请求,比如下发一个命令。

1. 对于同步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后,将响应内容封装成消息格式,指定route key(TYPETYPE.${HOSTNAME}),将处理结果消息返回。

2. 对于异步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后无需返回。

无论同步还是异步,服务调用方在发出请求(消息)后会立即返回,不会阻塞。如果是同步请求,那么只需提供回调处理函数,等待响应事件驱动。

每个服务启动后会初始化一条AMQP连接(基于TCP),该连接由3个Channel复用:一个Channel负责生产消息,一个Channel负责从TYPE(REPO/CTRL/WEB等)类型的队列消费消息,一个Channel负责从TYPE.${HOSTNAME}类型的队列消费消息。从队列的角度来看,一个TYPE.${HOSTNAME}类型队列只有一个消费者,一个TYPE类型队列可能有多个消费者。

这样的设计满足以下四类需求:

1. 点对点(P2P)或请求有状态服务:消息的route key设置为TYPE.${HOSTNAME}。比如host1上的WEB服务需要向host2上的REPO服务发送同步请求,只需将消息的route key设置为REPO.host2即可。REPO服务处理完请求后,将响应消息的route key设置为WEB.host1,发送回消息服务器。再比如REPO服务是有状态服务,伸缩性不好做,需要WEB服务做Presharding或者一致性哈希来决定调用哪个REPO服务,也跟上面一样,WEB服务根据计算好的值填充REPO.${HOSTNAME},进行点对点模式的消息通信。

2. 请求无状态服务:如果服务提供方是无状态服务,服务调用方不关心由哪个服务进行响应,那么只需将消息的route key设置为TYPE。比如CTRL是无状态服务,host1上的WEB服务只需将消息的route key设置为CTRL即可。CTRL队列会以Round-robin的调度算法将消息投递给其中的一个消费者。视WEB服务是否无状态而定,CTRL可以选择将响应消息的route key设置为WEB(假设WEB无状态)或者WEB.host1(假设WEB有状态)。

3. 组播:如果服务调用方需要与某类服务的所有节点通信,可以将消息的route key设置为TYPE.*,Exchange会将消息投递到所有TYPE.${HOSTNAME}队列。比如WEB服务需通知所有CTRL服务更新配置,只需将消息的route key设置为CTRL.*

4. 广播:如果服务调用方需要与所有服务的所有节点通信,也就是说对当前系统内所有节点广播消息,可以将消息的route key设置为*.*

本方案优缺点如下:

优点:
1. 路由策略灵活。
2. 支持负载均衡。
3. 支持高可用部署。
4. 支持消息可靠传输(生产者confirm,消费者ack,消息持久化)。
5. 支持prefetch,流控。

缺点:
1. 存在消息重复投递的可能性。
2. 对于多服务协作的场景支持度有限。比如以下场景:WEB服务发送同步请求给CTRL服务,CTRL本身无法提供该服务,需要调用REPO服务,再将REPO服务的响应结果返回给WEB。这个时候就需要CTRL缓存WEB请求,直至REPO响应。
3. 缺少超时管理,错误处理等。

以上列举缺点需要由业务方考虑解决。

顺便再提供本方案的一个简单SDK:Gear

Gear SDK提供的功能包括:
1. 基础组件(Exchange, Binding, Queue, etc)初始化。
2. 连接复用,断线重连。
3. P2P方式,组播,广播消息发送。
4. 异步消息接收。

--EOF--

『大型网站技术架构:核心原理与案例分析』(五)

『大型网站技术架构:核心原理与案例分析』读书笔记系列:
(一):架构演化、模式、要素
(二):高性能架构
(三):高可用架构
(四):可伸缩架构
(五):可扩展架构
(六):安全性架构


『大型网站技术架构』(五):可扩展架构

扩展性和伸缩性:

  • 扩展性(Extensibility): 指对现有系统影响最小的情况下,系统功能可持续扩展或提升的能力。目标是当系统新增功能时,不需要对现有系统的结构和代码进行修改。
  • 伸缩性(Scalability):指系统能够通过增加/减少自身资源规模的方式增强/减少自己计算处理事务的能力。目标是利用集群的方式增加服务器数量,提高系统的整体事务吞吐能力,实现线性伸缩性。

一、构建可扩展的网站架构

终极目标:系统间低耦合。如何分解系统的各个模块、如何定义各个模块接口、如何复用组合不同的模块构造一个完整的系统。

核心思想:模块化,并在此基础上降低模块间耦合性,提高模块复用性。

二、利用分布式消息队列降低系统耦合性

分布式消息队列通过消息对象分解系统耦合性,不同子系统处理同一个消息。

事件驱动架构

定义:事件驱动架构(Event Driven Architecture)通过在低耦合的模块之间传输事件消息,以保持模块的松散耦合,并借助事件消息的通信完成模块间合作。

典型的EDA架构比如生产者消费者模式。利用分布式消息队列的发布-订阅模式工作。生产者只需生产消息到队列,消费者从队列获取消息进行处理。新增业务,只要对某类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响。

三、利用分布式服务打造可复用的业务平台

分布式服务通过接口分解系统耦合性,不同子系统通过相同的接口描述进行服务调用。

巨无霸系统带来的问题:

  • 编译、部署困难
  • 代码分支管理困难:多个团队共同维护一份代码。
  • 数据库连接容易耗尽:数据库连接数与应用数量成正比。
  • 新增业务困难:老人不敢碰,新人不能接。

解决方法:拆分、模块独立部署

  • 纵向拆分:将大应用拆分为多个小应用。
  • 横向拆分:将复用业务拆分出来,独立部署为分布式服务,新增业务只需调用这些分布式服务,不需要依赖具体的模块代码。

Web Service与企业级分布式服务

Web Service: 服务提供者通过WSDL描述服务(接口),客户端通过WSDL生成客户端调用代码,通过SOAP协议与服务提供者通信,传输层协议可以是HTTP、SMTP、TCP等。

缺点:

  • 臃肿的注册与发现机制
  • 低效的XML序列化手段
  • 开销相对较高的HTTP远程通信
  • 复杂的部署与维护手段

大型网站分布式服务的需求

  • 服务注册与发现
  • 负载均衡:支持服务请求者使用可配置的负载均衡算法访问服务。
  • 失效转移
  • 高效的远程通信
  • 整合异构系统
  • 对应用最小侵入:适应服务架构的进化和反复(分布式或集中式部署)。
  • 版本管理:支持服务接口的多版本。
  • 实时监控

分布式服务框架设计

  • Thrift(远程服务调用框架):Facebook用它管理其分布式服务(注册、发现和调用),但是未开源基于Thrift的分布式服务框架。
  • Dubbo:阿里开源的分布式服务框架,较为成熟。

四、可扩展的数据结构

NoSQL: 宽列存储模型、ColumnFamily(列族)设计、面向列族的稀疏矩阵存储格式

五、利用开放平台建设网站生态圈

开放平台架构:

  • API接口:RESTful、Web Service、RPC等。
  • 协议转换:将API输入转成内部服务可识别的形式,将内部服务返回值封装成API格式。
  • 安全:身份识别、权限控制。
  • 审计:监控、计费。
  • 路由:将开放平台访问路由映射到具体的内部服务。
  • 流程:将一组离散的服务组织成一个上下文相关的新服务,隐藏服务细节,提供统一接口。

--EOF--

Erlang运行时之Message Passing

本文试图分析Erlang消息传递机制,总结优缺点,大部分资料来源于论文『Characterizing the Scalability of Erlang VM on Many-core Processors』和stackoverflow,并且带有自己的理解,不当之处需多包涵。

Erlang进程间通信采用的策略是消息传递(Message Passing)。这里的传递是指传递消息副本,而不是传递消息指针或者引用。小消息的复制成本可以忽略,现在的DDR3 SDRAM内存带宽约为2000MT/s,64位数据总线下有近16GB/s的数据传输速度。从VM的角度看,消息传递的本质就是调度器将消息从发送者进程的堆中复制到接收者进程的堆中。基于复制的消息传递有一个好处,可以非常方便实现以进程为单位的GC,因为GC范围是进程私有堆,不必考虑堆中的数据是否被其他进程引用。但是并不是所有消息都是通过复制实现消息传递,大于64K的二进制数据是通过共享内存实现的,前文已有提及。消息太大时,进程GC频繁触发,并且大消息复制成本也无法忽略,这两点带来的性能损耗可能比共享对象引用计数造成的性能损失还要大,因此,Erlang权衡之下对二进制数据采用了不同的消息传递策略。

在多核时代,不同的Erlang进程被不同的调度器执行。当接收进程正被其他调度器执行,而它的堆空间不够或者正有其他消息往堆中发送消息时,发送进程会分配一块临时的Heap Fragment(参考『Erlang运行时之进程』中的图1),用于存储待发送消息的副本,这块Heap Fragment数据会在GC期间被合入接收进程的堆空间。发送进程把消息复制完毕后,生成一个该消息的元数据结构(data management structure),该结构体包含了指向消息副本(处在接收进程堆或者Heap Fragment里)的指针,最后将元数据结构体放入接受进程的消息队列(mailbox)里。如果此时接收进程处于suspended状态,那么它会被激活,处理该消息;如果本来就是运行状态,那么消息会被等待取走进行匹配。

接收进程的mailbox在逻辑上是一个消息队列,但在物理上是通过两个物理队列实现,分别为public queue和private queue。前者对应PCB中的ErlMessageInQueue,只有在支持SMP的erts才有,用于接收其他进程发送来的消息,发送进程操作此队列时需要加锁互斥访问,避免不同进程操作同一块内存;后者对应PCB中的ErlMessageQueue,是接收进程取消息的队列,这样接收进程就可以不必关心锁带来的额外开销了,否则接收进程必须通过获取锁才能安全地读取完整的消息数据。当private queue为空时,public queue的消息会被追加到private queue里。由此可见,mailbox(含public queue和private queue)本身处在PCB中,而不是在堆中。ErlMessageInQueue和ErlMessageQueue的定义(erlang/erts/emulator/beam/erl_message.h)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct {
    ErlMessage* first;
    ErlMessage** last;  /* point to the last next pointer */
    ErlMessage** save;
    int len;            /* queue length */
} ErlMessageQueue;
 
#ifdef ERTS_SMP
typedef struct {
    ErlMessage* first;
    ErlMessage** last;  /* point to the last next pointer */
    int len;            /* queue length */
} ErlMessageInQueue;
#endif

其中,ErlMessageQueue结构体有个save属性,这个save链表就是进程selective recieve时匹配失败后将消息暂存的save queue。

在支持SMP的Erlang虚拟机里,消息传递机制会引入一些额外消耗,这些消耗会在一定程度上影响到CPU核数在erts上的可扩展性:
1. 上文已有提及,分属不同调度器的发送进程向同一个接收进程发送消息时,元数据结构入public queue需要锁去同步。
2. 发送进程要从OS进程的堆空间分配内存供存储消息(或者Heap Fraigment)和元数据使用,这个分配内存的操作需要加锁。同理,内存释放操作也同样需要加锁。
3. 当大量进程并行执行时,他们的消息收发顺序相对于单核模式下是乱序的,虽然不影响最终接收进程的消息匹配,但是会带来损耗,原因是进程的消息接收采用selective receive机制,有些消息会被匹配多次。

Erlang的消息传递机制基于复制。我觉得,通过传递消息副本来避免锁不是Erlang高效的直接原因,因为有变量不变性(immutable variable)保障,即使传递引用也可以避免不必要的锁开销,况且,在支持SMP的运行时环境下,锁作为同步手段也经常使用。但是基于复制带来的好处是方便实现以进程为粒度的GC机制,从而在间接上提升了Erlang的并发性。

--EOF--

RabbitMQ能打开的最大连接数

RabbitMQ自带了显示能够接受的最大连接数,有2种比较直观的方式:
1. rabbitmqctl命令。

1
2
3
4
5
6
7
8
9
10
11
12
n$ rabbitmqctl status
Status of node 'rabbit@10-101-17-13' ...
[{pid,23658},
 ......
 {file_descriptors,
     [{total_limit,924},
      {total_used,10},
      {sockets_limit,829},
      {sockets_used,10}]},
 ......
]
...done.

2. rabbitmq_management WebUI插件。

本文关注当RabbitMQ可用连接数耗尽时客户端的影响以及如何增加最大连接数默认值。

RabbitMQ的socket连接数(socket descriptors)是文件描述符(file descriptors,fd)的一个子集。也就是说,RabbitMQ能同时打开的最大连接数和最大文件句柄数(文件系统,管道)都是受限于操作系统关于文件描述符数量的设置,两者是此消彼长的关系。初始时,可用socket描述符与可用fd数量的比率大概在0.8-0.9左右,这个值并不固定,当socket描述符有剩余时,RabbitMQ会使用尽量多的文件描述符用于磁盘文件读写。随着服务器建立越来越多的socket连接,文件句柄开始回收,数量减少。总之,RabbitMQ会优先将文件描述符用于建立socket连接,宁可牺牲频繁打开/关闭文件带来的磁盘操作性能损耗,这种取舍很容易理解,作为网络服务器,当然优先保障网络吞吐率了。因此,对于高并发连接数的多队列读写时,队列性能会稍微差那么一点,比如用RabbitMQ做RPC。

当服务器建立的socket连接已经达到限制(sockets_limit)时,服务器不再接受新连接。这里要区分清楚,RabbitMQ不再接收的是AMQP连接,而不是传输层的TCP连接,通过简单抓包分析即可清楚流程:

1
2
3
4
5
6
7
8
$ sudo tcpdump host 10.101.17.13 and port 5672
17:24:12.214186 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [S], seq 3319779561, win 65535, options [mss 1368,nop,wscale 5,nop,nop,TS val 1006381554 ecr 0,sackOK,eol], length 0
17:24:12.214231 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [S.], seq 1636058035, ack 3319779562, win 14480, options [mss 1460,sackOK,TS val 24529834 ecr 1006381554,nop,wscale 5], length 0
17:24:12.218795 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [.], ack 1, win 4110, options [nop,nop,TS val 1006381560 ecr 24529834], length 0
17:24:12.243184 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [P.], seq 1:9, ack 1, win 4110, options [nop,nop,TS val 1006381583 ecr 24529834], length 8
17:24:12.243201 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [.], ack 9, win 453, options [nop,nop,TS val 24529841 ecr 1006381583], length 0
17:24:22.247907 IP 10.101.17.166.56925 > 10.101.17.13.amqp: Flags [F.], seq 9, ack 1, win 4110, options [nop,nop,TS val 1006391550 ecr 24529841], length 0
17:24:22.284914 IP 10.101.17.13.amqp > 10.101.17.166.56925: Flags [.], ack 10, win 453, options [nop,nop,TS val 24532352 ecr 1006391550], length 0

line 2-4是TCP握手包,成功建立TCP连接。line 5开始客户端向服务器端发送AMQP协议头字符串“AMQP0091”,共8个字节,开始AMQP握手。line 6是服务器回给客户端的ack包,但未发送AMQP connection.start方法,导致客户端一直等到超时(line 7-8),发送FIN包关闭TCP连接。至此,AMQP连接建立失败。

从客户端(Java SDK)来看上述这个过程,客户端通过ConnectionFactory实例的newConnection()方法创建一条AMQP连接。在网络层,它首先通过java.net.Socket与服务器建立一条TCP连接,发送协议协商字符串“AMQP0091”,然后启动MainLoop线程,通过封装的Frame实例来循环读取帧(readFrame()),注意readFrame()方法可能会有一个SocketTimeoutException的超时异常,这个超时时间是由socket实例setSoTimeout方法写入,默认是10s,由AMQConnection.HANDSHAKE_TIMEOUT常量指定。当超时发生在AMQP连接握手阶段时,就抛出一个SocketTimeoutException异常,发生在其他阶段(除心跳超时)时,什么都不做继续下一个循环:

1
2
3
4
Caused by: java.net.SocketTimeoutException: Timeout during Connection negotiation
 at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection...
 at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:59)
 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)

这里的socket读取超时很容易跟连接超时搞混。连接超时由ConnectionFactory实例的setConnectionTimeout()方法指定,对应着网络层Socket实例connect()方法中的timeout参数,指的是完成TCP三次握手的超时时间;而读取超时是从socket中读取字节流的等待时间,前文已经说过,由Socket实例的setSoTimeout()指定。这在各种网络库中应该很常见,比如HttpClient。

私以为这种情况下更好的设计是应该由RabbitMQ主动断开与客户端的TCP连接,减少客户端等待时间。

最后一个问题,如何增加RabbitMQ的能够同时打开的连接数。通过前文可知,最大并发连接数由此进程可打开的最大文件描述符数量(乘以一个比例系数)决定,因此只要增加单个进程可打开的文件描述符数量即可。有几个常规方法,按作用范围可以归纳为以下几类:

1. 进程级别。在启动脚本rabbitmq-server中加入ulimit -n 10240命令(假设将最大文件描述符数量设置为10240,下同),相当于在shell中执行,由此shell进程fork出来的进程都能继承这个配置。

2. 用户级别。修改/etc/security/limits.conf文件,添加以下配置,重新登录生效:

1
2
user    soft    nofile    10240
user    hard    nofile    10240

3. 系统级别。

1
# echo 10240 > /proc/sys/fs/file-max

上述设置只是针对proc文件系统,相当于修改了操作系统的运行时参数,重启后失效。要想永久生效,需要修改/etc/sysctl.conf文件,加入配置项fs.file-max=10240。

一个进程能打开的最大文件描述符数量受限于上述三个级别配置中的最小值。理论上,系统级别的配置数值必须要大于用户级别,用户级别的要大于进程级别的,只有这样配置才是安全的,否则进程容易因为打开文件数量问题受到来自操作系统的种种限制。操作系统为什么要限制可打开的文件描述符数量?为了系统安全。因为文件描述符本质上是一种内存中的数据结构,如果不加以限制,很容易被进程无意或恶意耗尽内存,比如fork bomb

--EOF--

RabbitMQ消息过期时间

RabbitMQ针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。

1. 设置队列属性。
通过队列属性设置消息TTL的方法是在queue.declare方法中加入x-message-ttl参数,单位为ms。
SDK设置如下:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

HTTP接口调用如下:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT 
-d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://localhost:15672/api/queues/{vhost}/{queuename}

如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分代替RabbitMQ 3.0.0以前支持的immediate参数,之所以说部分代替,是因为immediate参数在投递失败会有basic.return方法将消息体返回,详见『AMQP协议mandatory和immediate标志位区别』

2. 设置消息属性。
针对每条消息设置TTL的方法是在basic.publish方法中加入expiration的属性参数,单位为ms。
SDK设置如下:

1
2
3
4
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("myexchange", "routingkey", properties, messageBodyBytes);

HTTP接口调用如下:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST 
-d'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my 
body","payload_encoding":"string"}' 
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而第二种方法里,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。

--EOF--