月度归档:2015年06月

Qmeta: RabbitMQ元数据备份工具

RabbitMQ的元数据泛指服务器端的Exchange、Binding、Queue、Vhost、Policy、账户、权限等定义信息。对元数据进行备份有两个好处:

1. 队列迁移。可以直接导入,不必所有信息重新输入。
2. 故障恢复。『单机磁盘故障引发RabbitMQ镜像队列数据丢失』一文中分析了镜像队列数据丢失的场景,如果元数据进行过备份的话,可以快速恢复服务。

RabbitMQ管理插件本身提供了元数据备份的API(GET /api/definitions),关键是如何高效地备份,特别是当手上有上百个RabbitMQ节点需要运维的时候。

Qmeta是我用Go写的一个RabbitMQ元数据备份小工具,它支持批量备份元数据,并以json的形式存储在备份目录下。

使用方式:
1. 构建

$ git clone git@github.com:fengchj/ftool.git
$ cd ftool/qmeta
$ go build 

2. 使用

$ qmeta -h
Usage of qmeta:
  -file="config": config file contains RabbitMQ node addrs.
$ cat config
127.0.0.1:15672 guest guest
10.20.30.40:15672 guest guest
$ qmeta -file config
Host 127.0.0.1 backup done!
Host 10.20.30.40 backup done!
$ cd qmeta_{time-in-second}
$ ls -l
127.0.0.1.json
10.20.30.40.json

简单介绍下Qmeta的实现:

1. flag包解析配置文件路径。
2. 每个节点的备份任务交给独立的goroutine处理。
3. net/http包发送HTTP GET请求,设置连接超时参数Timeout。
4. 所有goroutine共享http.Client实例,协程安全。
5. 使用带缓冲区的channel来实现主程序和goroutine的同步,当所有goroutine返回后,安全关闭channel。
6. 协程在defer语句中进行channel数据写入,以此来保证是否备份成功,goroutine都会返回。

--EOF--

一次线程同步问题的Bugfix

前两天碰到一个Java多线程未同步引起的Bug。Bug本身并没多少槽点,无非是本该同步的方法没加同步,造成最后执行结果非预期结果,但是在解决Bug的过程中发现还是有些知识点值得记录下来的。

本文分成两部分:1. Bug分析与解决。 2. synchronized注意事项。

1. Bug分析与解决

Watcher

这是出现Bug的业务场景和关键代码。Wathcer Service监听Etcd中产生的事件,然后提交给线程池处理。假设XJob是负责处理相关Etcd事件的类,其业务逻辑如代码所示,问题出在注释3-5处,当Etcd在很短的时间间隔内连续产生同一个操作对象的两个相同事件(该场景是我在设计之初未考虑到)时,会有多个XJob实例(线程)同时执行到注释3所在的区块。由于注释4处的rpc调用是个比较耗时的操作,因此可能会造成某个Entry在第一个线程中尚未写回到DB时,又被第二个线程取出来操作,导致数据不一致。

分析出问题所在后,最简单的方法就是通过synchronized关键字对相关代码进行同步。修复后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run() {
    String id = getId(context);
    ......
    if (condition) {
        ......
    } else {
        synchronized(id.intern()) {
            Entry e = getFromDb(id, DEAD);
            rpc(e);
            update2Db(e, ALIVE);
            ......
        }
    }
}

这里有两个问题需在后面解答。

1). 在run()方法前加synchronized来修复Bug是否可行?

2). 为什么要通过synchronized(id.intern()){}而不是synchronized(id){}?

2. synchronized注意事项

1). synchronized关键字可以修饰方法(类方法/实例方法),也可以修饰代码块。本质上,进入synchronized修饰的区块都会得到一把锁,一块代码到底会不会同步执行,关键就是看多线程竞争的是否同一把锁。如此来理解所谓的对象锁和类锁就容易多了。先说对象锁,顾名思义,锁的粒度是对象。比如当synchronized修饰实例方法,它等同于用synchronized(this){}来修饰这个方法内的代码,也就是说除非是多个线程调用同一个实例(对象)的方法,否则synchronized不生效。以此类推,相同实例(对象)的不同synchronized方法也可以同步执行,因为他们的锁对象都是当前对象地址(this指针)。再说类锁,锁的粒度是类,一般修饰类方法或类变量,或者称为静态(static)方法或静态(static)变量。当synchronized修饰static方法时,它等同于用synchronized(类名.class){}来修饰这个静态方法内的代码。

举个栗子。某Test类有静态方法m1(),非静态方法m2()和m3(),三个方法都被synchronized关键字修饰,现有一个Test类的实例test。两个线程T1和T2,执行m2()和m3()时会同步,因为它们都要获得对象锁test。但是它们可以同时调用m1()和m2(),或者m1()和m3(),因为执行m1()是获得类锁Test.class,而执行m2()/m3()是获得对象锁test。

至此第1部分的两个问题原因显而易见了:

(1). run()方法前加synchronized来修复Bug是否可行?不行。因为提交给线程池执行的XJob对象不是单例的,XJob有很多个对象,不能用对象锁的方式。

(2). 为什么不是直接用synchronized(id){}?因为id对象是运行时生成的,这个String对象肯定分配在堆里。既然分配在堆里,即使id相同(equals()返回true),他们也属于不同的对象,不能用对象锁的方式。再看看String对象的intern()方法做了什么,intern()方法表示返回当前String对象在常量池中的地址,如果常量池中尚未存在该对象的值,那么就会将值放入常量池后再返回其地址。例如,两个String对象s1和s2,如果他们的值相同s1.equals(s2),那么s1.intern()==s2.intern()。当然它也有副作用,比如说频繁调用之后会引起Full GC。关于intern()实际上还有很多要注意的地方,涉及到GC性能,而且在不同的JDK版本下表现还不同,等有时间可以写一篇文章整理一下。

2). 虽然synchronized不可以修饰构造方法,但还是可以通过synchronized(this){}的方式修饰构造方法中代码块,然而这并没有什么意义,synchronized不会生效。想象一下,什么场景下多个线程需要初始化出相同地址的一个对象?如果构造方法中的初始化代码真的需要同步,可以通过类锁或其他的方式,但绝不会是synchronized(this)。

3). 进入synchronized代码块时获得的锁是由JVM负责释放的,这意味着无论代码是正常结束还是异常(checked, unchecked)退出,都不必关心锁未释放的问题。

--EOF--

Erlang进程GC引发RabbitMQ Crash

生产环境下曾数次出现过RabbitMQ异常崩溃的场景,好在镜像队列的部署方式对服务未产生影响。

我们的RabbitMQ部署环境是:

KVM: 2 VCPU, x86_64, 4G RAM, swap disabled。
OS: Debian 7.0, Linux Kernel 3.2.0-4-amd64
Erlang: R15B01 (erts-5.9.1) [64-bit] [smp:2:2]
RabbitMQ: 3.1.5, {vm_memory_high_watermark,0.4}, {vm_memory_limit,1663611699}

分析最近一次崩溃产生的erl_crash.dump文件:

=erl_crash_dump:0.1
Tue Jun 16 00:46:49 2015
Slogan: eheap_alloc: Cannot allocate 1824525600 bytes of memory (of type "old_heap").
System version: Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:2:2] [async-threads:30] [kernel-poll:true]
Compiled: Sun Jan 27 18:19:34 2013
Taints:
Atoms: 22764
=memory
total: 1980253120
processes: 1563398383
processes_used: 1563397555
system: 416854737
atom: 703377
atom_used: 674881
binary: 385608216
code: 18475052
ets: 7643488
......

发现Crash原因是:Cannot allocate 1824525600 bytes of memory (of type "old_heap")。从数据来看,Erlang虚拟机已占用约1.98G内存(其中分配给Erlang进程的占1.56G),此时仍向操作系统申请1.82G,因为操作系统本身以及其他服务也占用一些内存,当前系统已经分不出足够的内存了,所以Erlang虚拟机崩溃。

此处有两个不符合预期的数据:
1. vm_memory_limit控制在1.67G左右,为什么崩溃时显示占用了1.98G?
2. 为什么Erlang虚拟机会额外再申请1.82G内存?

经过一些排查和总结,发现几次崩溃都是出现在队列中消息堆积较多触发了流控或者有大量unack消息requeue操作的时候,基本把原因确定为Erlang对进程进行Major GC时系统内存不足上。

在之前的『RabbitMQ与Erlang』一文中,曾简单介绍过Erlang的软实时特性,其中按进程GC是实现软实时的一个重要手段:

RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量。

这也是RabbitMQ将内存流控的安全阈值设置为0.4的原因,即使double,也是0.8,还是安全的。0.4的意思是,当broker的内存使用量大于40%时,开始进行生产者流控,但是该参数并不承诺broker的内存使用率不大于40%。官方文档也强调过这一点:

The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM).

实际上通过RabbitMQ运行日志很容易证实,broker的实际内存使用量可以远远大于vm_memory_high_watermark设定值,比如:

$ less /var/log/rabbitmq/rabbit@10.xx.xx.xx.log
=INFO REPORT==== 16-Jun-2015::00:46:25 ===
vm_memory_high_watermark set. Memory used:2497352280 allowed:1663611699
=WARNING REPORT==== 16-Jun-2015::00:46:25 ===
memory resource limit alarm set on node 'rabbit@10.xx.xx.xx'.

内存流控阈值掐在1.6G,但是实际使用了近2.5G!再加上操作系统本身和其他服务吃掉的内存,轻松超过3G。如果此时触发Erlang进程Major GC,需要占用双倍的当前堆内存大小,那么报本文开头的old_heap堆内存分配错误也就不足为奇了。

知道了问题所在,如何解决这个问题呢?

遗憾的是,我目前也没有确切的解决方法(如有人知道,烦请告知)。但是从问题的形成原因来看,至少从以下几个方面可以显著降低问题出现概率。

1. RabbitMQ独立部署,不与其他Server共享内存资源。
2. 进一步降低vm_memory_high_watermark值,比如设置成0.3,但是这种方式会造成内存资源利用率太低。
3. 开启swap,问题在于容易造成性能和吞吐量恶化。
4. 升级RabbitMQ至新版(3.4+)。我个人觉得这个问题的根本原因在于RabbitMQ对内存的管理上,特别是早期版本,Matthew(RabbitMQ作者之一)也曾谈到过这个问题。尽管设计之初就考虑了各种优化,使得队列进程私有堆尽量小,比如当触发了某些条件,会page out到磁盘,或以二进制数据的方式存储在共享数据区,但是即便如此,在某些特定时刻,队列进程私有堆仍会消耗大量内存,这点从Management Plugin或者Remote shell里都可以看出来,而每次的“Cannot allocate old_heap”问题恰恰也总是出现在这些时刻。RabbitMQ 3.4版本发布后,官方发布了一篇新版内存管理方面的博客,从介绍上来看,要准确计算和控制当前系统使用的内存量确实非常困难。所以作者的感慨有理:

Of course, this still doesn't give a perfect count of how much memory is in use by a queue; such a thing is probably impossible in a dynamic system. But it gets us much closer.

--EOF--

Linux JRE中文字体支持

默认情况下,无论Oracle JDK还是OpenJDK,Linux JRE都不提供中文字体支持,这会给那些AWT应用开发者或者基于AWT实现的图表库(jfreechart等)使用者带来一些困扰,本来应该显示中文的地方都被方框代替:

fonts

解决方法可以很简单:

1. 拷贝中文字体到JRE目录。以宋体为例,从一台含中文字体的机器上(Mac下字体在/library/fonts目录,Windows下字体在C:\Windows\Fonts目录)拷贝SimSun.ttf文件到目标机器的$JAVA_HOME/jre/lib/fonts。
2. 重启应用(JVM)。

下面这段程序可以查看当前JRE环境支持哪些字体:

import java.awt.Font;
import java.awt.GraphicsEnvironment;

public class FontTest {

    public static void main(String[] args) {
        Font[] fonts = GraphicsEnvironment
                        .getLocalGraphicsEnvironment().getAllFonts();
        for (Font f : fonts) {
            System.out.println("Name:" + f.getFontName());
        }
    }
}

--EOF--

一种分布式系统消息服务器设计

设计一个分布式系统,首先面临的就是如何解决服务间的通信问题,同步还是异步,是采用基于消息总线的事件驱动架构(EDA)还是分布式服务框架,这在很大程度上决定了系统的可扩展性。

消息服务器的可选择性很多,比如早些年的XMPP服务器,传统的JMS服务器,还有目前比较流行的AMQP消息服务器,简单的优缺点对比如下:

类型 优点 缺点
Openfire (XMPP) 1. 成熟,稳定。
2. 适合做聊天服务,
   在IM领域(Gtalk,网易POPO等)应用广泛。
1. 消息可靠性无保障。
2. 路由策略不够灵活。
3. 集群模式不完善。
4. 协议太重。
ActiveMQ (JMS) 1. 成熟,稳定。
2. 与Java应用契合度高。
1. 路由策略不够灵活。
2. 集群模式不稳定。
RabbitMQ (AMQP) 1. 成熟,稳定。
2. 路由策略灵活。
3. 消息可靠传输。
4. 集群方案成熟。
1. 配置项多,学习和运维成本高。

本文分享一种基于RabbitMQ的消息服务器设计方案:

MQ Server

如上图所示,黄色虚线表示的是分布式系统的一个子服务,假设分别为WEB、REPO和CTRL服务。P表示生产者,C表示消费者。生产者把消息发送到一个topic模式的Exchange (X),根据route key和binding key的运算结果,将消息路由到相应队列,再由队列投递给具体的消费者。

我们将请求从服务调用方的角度分成两类:同步和异步。同步(rpc.call)是指需要应答的请求,比如获取依赖服务的状态。异步(rpc.cast)是指无需应答的请求,比如下发一个命令。

1. 对于同步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后,将响应内容封装成消息格式,指定route key(TYPETYPE.${HOSTNAME}),将处理结果消息返回。

2. 对于异步请求,服务调用方将消息publish到Exchange,根据route key路由到相应队列,投递给服务提供方的消费者处理。服务提供方消费者处理完请求后无需返回。

无论同步还是异步,服务调用方在发出请求(消息)后会立即返回,不会阻塞。如果是同步请求,那么只需提供回调处理函数,等待响应事件驱动。

每个服务启动后会初始化一条AMQP连接(基于TCP),该连接由3个Channel复用:一个Channel负责生产消息,一个Channel负责从TYPE(REPO/CTRL/WEB等)类型的队列消费消息,一个Channel负责从TYPE.${HOSTNAME}类型的队列消费消息。从队列的角度来看,一个TYPE.${HOSTNAME}类型队列只有一个消费者,一个TYPE类型队列可能有多个消费者。

这样的设计满足以下四类需求:

1. 点对点(P2P)或请求有状态服务:消息的route key设置为TYPE.${HOSTNAME}。比如host1上的WEB服务需要向host2上的REPO服务发送同步请求,只需将消息的route key设置为REPO.host2即可。REPO服务处理完请求后,将响应消息的route key设置为WEB.host1,发送回消息服务器。再比如REPO服务是有状态服务,伸缩性不好做,需要WEB服务做Presharding或者一致性哈希来决定调用哪个REPO服务,也跟上面一样,WEB服务根据计算好的值填充REPO.${HOSTNAME},进行点对点模式的消息通信。

2. 请求无状态服务:如果服务提供方是无状态服务,服务调用方不关心由哪个服务进行响应,那么只需将消息的route key设置为TYPE。比如CTRL是无状态服务,host1上的WEB服务只需将消息的route key设置为CTRL即可。CTRL队列会以Round-robin的调度算法将消息投递给其中的一个消费者。视WEB服务是否无状态而定,CTRL可以选择将响应消息的route key设置为WEB(假设WEB无状态)或者WEB.host1(假设WEB有状态)。

3. 组播:如果服务调用方需要与某类服务的所有节点通信,可以将消息的route key设置为TYPE.*,Exchange会将消息投递到所有TYPE.${HOSTNAME}队列。比如WEB服务需通知所有CTRL服务更新配置,只需将消息的route key设置为CTRL.*

4. 广播:如果服务调用方需要与所有服务的所有节点通信,也就是说对当前系统内所有节点广播消息,可以将消息的route key设置为*.*

本方案优缺点如下:

优点:
1. 路由策略灵活。
2. 支持负载均衡。
3. 支持高可用部署。
4. 支持消息可靠传输(生产者confirm,消费者ack,消息持久化)。
5. 支持prefetch,流控。

缺点:
1. 存在消息重复投递的可能性。
2. 对于多服务协作的场景支持度有限。比如以下场景:WEB服务发送同步请求给CTRL服务,CTRL本身无法提供该服务,需要调用REPO服务,再将REPO服务的响应结果返回给WEB。这个时候就需要CTRL缓存WEB请求,直至REPO响应。
3. 缺少超时管理,错误处理等。

以上列举缺点需要由业务方考虑解决。

顺便再提供本方案的一个简单SDK:Gear

Gear SDK提供的功能包括:
1. 基础组件(Exchange, Binding, Queue, etc)初始化。
2. 连接复用,断线重连。
3. P2P方式,组播,广播消息发送。
4. 异步消息接收。

--EOF--