标签归档:Java

Java IO Stream句柄泄露分析

Java io包封装了常用的I/O操作,流式操作被统一为InputStream、OutputStream、Reader、Writer四个抽象类,其中InputStream和OutputStream为字节流,Reader和Writer为字符流。流式抽象的好处在于程序员无需关心数据如何传输,只需按字节或字符依次操作即可。

在使用流式对象进行操作时,特别是文件操作,使用完毕后千万不能忘记调用流对象的close()方法,这也是老生常谈的话题,但是偏偏很容易忘记,或者没忘记但是使用不当导致close()方法没调用到。正确的做法是把close()方法放在finally块中调用,比如这样:

InputStream in = ...;
OutputStream out = ...;
try {
    doSth(in, out);
} catch (Exception e) {
    handleException();
} finally {
    try {
        in.close();
    } catch (Exception e1){
    }
    try {
        out.close();
    } catch (Exception e2){
    }
}

Java流式操作在便捷了I/O操作的同时,也带来了错误处理上复杂性,这也是Java被人诟病的理由之一。Golang在这块的处理就非常优雅,它提供了defer关键字来简化流程。

当文件流未被显式关闭时,会产生怎样的后果?结果就是会引起文件描述符(fd)耗尽。以下代码会打开一个文件10次,向系统申请了10个fd。

public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; i++) {
        InputStream is = new FileInputStream(new File("file"));
    }
    System.in.read(b); // pause
}

到/proc/{pid}/fd目录下确定已打开的文件描述符:

root@classa:/proc/16333/fd# ls -l
total 0
lrwx------ 1 root root 64 Aug  2 20:43 0 -> /dev/pts/3
lrwx------ 1 root root 64 Aug  2 20:43 1 -> /dev/pts/3
lr-x------ 1 root root 64 Aug  2 20:43 10 -> /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunjce_provider.jar
lr-x------ 1 root root 64 Aug  2 20:43 11 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 12 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 13 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 14 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 15 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 16 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 17 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 18 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 19 -> /root/file
lrwx------ 1 root root 64 Aug  2 20:43 2 -> /dev/pts/3
lr-x------ 1 root root 64 Aug  2 20:43 20 -> /root/file
lr-x------ 1 root root 64 Aug  2 20:43 3 -> /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rt.jar

但是根据以往经验,即使流未显式关闭,也没见过文件描述符耗尽的情况。这是因为Java文件流式类做了保护措施,FileInputStream和FileOutputStream类利用Java的finalizer机制向GC注册了资源回收的回调函数,当GC回收对象时,实例对象的finalize()方法被调用。以FileInputStream为例,看看它是怎么处理的:

/**
 * Ensures that the close method of this file input stream is
 * called when there are no more references to it.
 *
 * @exception  IOException  if an I/O error occurs.
 * @see        java.io.FileInputStream#close()
 */
protected void finalize() throws IOException {
    if ((fd != null) &&  (fd != FileDescriptor.in)) {

        /*
         * Finalizer should not release the FileDescriptor if another
         * stream is still using it. If the user directly invokes
         * close() then the FileDescriptor is also released.
         */
        runningFinalize.set(Boolean.TRUE);
        try {
            close();
        } finally {
            runningFinalize.set(Boolean.FALSE);
        }
    }
}

当fd未释放时,finalize()方法会调用close()方法关闭文件描述符。有了这一层保障后,即使程序员粗心忘了关闭流,也能保证流最终会被正常关闭了。以下程序可以验证:

public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; i++) {
        InputStream is = new FileInputStream(new File("file"));
    }
    byte[] b = new byte[2];
    System.in.read(b); // pause1
    System.gc();
    System.in.read(b); // pause2
}

Java运行参数加上GC信息便于观察:

# java -verbose:gc -XX:+PrintGCDetails -Xloggc:gc.log -XX:+PrintGCTimeStamps StreamTest

程序在pause1处打开了10个fd,接着强制通过System.gc()触发一次GC,等gc.log中GC日志输出后再观察/proc/{pid}/fd目录,发现已打开的文件描述符均已经关闭。

但是即便如此,依然存在资源泄漏导致程序无法正常工作的情况,因为JVM规范并未对GC何时被唤起作要求,而对象的finalize()只有在其被回收时才触发一次,因此完全存在以下情况:在两次GC周期之间,文件描述符被耗尽!这个问题曾经在生产环境中出现过的,起因是某同事在原本只需加载一次的文件读取操作写成了每次用户请求加载一次,在一定的并发数下就导致too many open file的异常:

Exception in thread "main" java.io.FileNotFoundException: file (Too many open files)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.(FileInputStream.java:146)
        ......

--EOF--

基于Redis的分布式锁的简易实现

本文讨论的分布式锁适用于集群节点程序的互斥执行,注意此处的“分布式”是指该锁的适用场景是分布式系统,而非锁本身是分布式的,关于锁本身的分布式设计可以参考Redis官方介绍:『Distributed locks with Redis』。

之前在『如何用Spring实现集群环境下的定时任务』一文中提到利用Redis的"GET/SET+TTL"机制实现集群场景下定时任务的互斥执行功能,这实际上就是本文实现的原型。该文描述的方法的问题在于GET和SET方法的组合并非原子操作,在多进程并行执行场景下可能有多个客户端获得锁,从而破坏了锁的安全性。

本文的改进在两个方面:

1. 要解决分布式锁的安全性问题,需要使用Redis提供的锁原语:SETNX(since 1.0.0)或者SET NX EX(since 2.6.12)。这类命令的语义是:如果Key已存在,则返回SET成功,否则返回失败。

2. 使用注解方式加锁和解锁,避免代码重复和耦合。

以下是Java实现(Spring AOP)的原型:

1. 定义注解类,接收expire参数,表示此锁的过期时间。如果在定时任务中使用,一般要大于节点间的时间差,小于定时任务的时间间隔。

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface DLock {
    public String value() default "";

    public String expire() default "30";
}

2. 定义切面类,完成锁的申请和释放逻辑。

@Aspect
@Component
public class DLockAdvice {
    private static String DLOCK_PREFIX = "DLOCK_PREFIX_";

    @Autowired
    private RedisService redisService = null;

    @Around("@annotation(lock)")
    public Object lock(ProceedingJoinPoint pjp, DLock lock) throws Throwable {
        Long expire = Long.valueOf(lock.expire());;
        Signature sig = pjp.getSignature();
        String lockKey = DLOCK_PREFIX + sig.getDeclaringTypeName() + sig.getName() + expire;
        if (redisService.setValueIfAbsent(lockKey, true, expire)) {
            return pjp.proceed();
        } 
        return null;
    }
}

考虑到注解的通用性,锁名称的区分度越大越好,此处采用的“前缀+包名+类名+方法名+过期时间”,因此假如有个方法通过参数个数或者类型不同进行重载,则该锁会对这个重载的每个方法都生效,除非把参数个数或者类型信息加到锁名称里。

另外考虑到Redis服务本身或者网络的不稳定性,需要在RedisService的setValueIfAbsent()方法中对异常进行处理,假如:

1. 能够容忍多个客户端同时获得锁。那么当执行Redis命令异常时返回true。
2. 无法容忍多个客户端同时获得锁,宁愿没有客户端可以获得锁。那么当执行Redis命令异常时返回false。

--EOF--

一次线程同步问题的Bugfix

前两天碰到一个Java多线程未同步引起的Bug。Bug本身并没多少槽点,无非是本该同步的方法没加同步,造成最后执行结果非预期结果,但是在解决Bug的过程中发现还是有些知识点值得记录下来的。

本文分成两部分:1. Bug分析与解决。 2. synchronized注意事项。

1. Bug分析与解决

Watcher

这是出现Bug的业务场景和关键代码。Wathcer Service监听Etcd中产生的事件,然后提交给线程池处理。假设XJob是负责处理相关Etcd事件的类,其业务逻辑如代码所示,问题出在注释3-5处,当Etcd在很短的时间间隔内连续产生同一个操作对象的两个相同事件(该场景是我在设计之初未考虑到)时,会有多个XJob实例(线程)同时执行到注释3所在的区块。由于注释4处的rpc调用是个比较耗时的操作,因此可能会造成某个Entry在第一个线程中尚未写回到DB时,又被第二个线程取出来操作,导致数据不一致。

分析出问题所在后,最简单的方法就是通过synchronized关键字对相关代码进行同步。修复后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run() {
    String id = getId(context);
    ......
    if (condition) {
        ......
    } else {
        synchronized(id.intern()) {
            Entry e = getFromDb(id, DEAD);
            rpc(e);
            update2Db(e, ALIVE);
            ......
        }
    }
}

这里有两个问题需在后面解答。

1). 在run()方法前加synchronized来修复Bug是否可行?

2). 为什么要通过synchronized(id.intern()){}而不是synchronized(id){}?

2. synchronized注意事项

1). synchronized关键字可以修饰方法(类方法/实例方法),也可以修饰代码块。本质上,进入synchronized修饰的区块都会得到一把锁,一块代码到底会不会同步执行,关键就是看多线程竞争的是否同一把锁。如此来理解所谓的对象锁和类锁就容易多了。先说对象锁,顾名思义,锁的粒度是对象。比如当synchronized修饰实例方法,它等同于用synchronized(this){}来修饰这个方法内的代码,也就是说除非是多个线程调用同一个实例(对象)的方法,否则synchronized不生效。以此类推,相同实例(对象)的不同synchronized方法也可以同步执行,因为他们的锁对象都是当前对象地址(this指针)。再说类锁,锁的粒度是类,一般修饰类方法或类变量,或者称为静态(static)方法或静态(static)变量。当synchronized修饰static方法时,它等同于用synchronized(类名.class){}来修饰这个静态方法内的代码。

举个栗子。某Test类有静态方法m1(),非静态方法m2()和m3(),三个方法都被synchronized关键字修饰,现有一个Test类的实例test。两个线程T1和T2,执行m2()和m3()时会同步,因为它们都要获得对象锁test。但是它们可以同时调用m1()和m2(),或者m1()和m3(),因为执行m1()是获得类锁Test.class,而执行m2()/m3()是获得对象锁test。

至此第1部分的两个问题原因显而易见了:

(1). run()方法前加synchronized来修复Bug是否可行?不行。因为提交给线程池执行的XJob对象不是单例的,XJob有很多个对象,不能用对象锁的方式。

(2). 为什么不是直接用synchronized(id){}?因为id对象是运行时生成的,这个String对象肯定分配在堆里。既然分配在堆里,即使id相同(equals()返回true),他们也属于不同的对象,不能用对象锁的方式。再看看String对象的intern()方法做了什么,intern()方法表示返回当前String对象在常量池中的地址,如果常量池中尚未存在该对象的值,那么就会将值放入常量池后再返回其地址。例如,两个String对象s1和s2,如果他们的值相同s1.equals(s2),那么s1.intern()==s2.intern()。当然它也有副作用,比如说频繁调用之后会引起Full GC。关于intern()实际上还有很多要注意的地方,涉及到GC性能,而且在不同的JDK版本下表现还不同,等有时间可以写一篇文章整理一下。

2). 虽然synchronized不可以修饰构造方法,但还是可以通过synchronized(this){}的方式修饰构造方法中代码块,然而这并没有什么意义,synchronized不会生效。想象一下,什么场景下多个线程需要初始化出相同地址的一个对象?如果构造方法中的初始化代码真的需要同步,可以通过类锁或其他的方式,但绝不会是synchronized(this)。

3). 进入synchronized代码块时获得的锁是由JVM负责释放的,这意味着无论代码是正常结束还是异常(checked, unchecked)退出,都不必关心锁未释放的问题。

--EOF--

一次Java内存泄露问题排查

近期随着管理平台用户增加,后台多次出现OutOfMemoryError错误,尝试着把堆内存从2G加到4G(-Xms4096m -Xmx4096m)也不顶用,隔段时间就OOM,看来是出现内存泄露了。于是在Tomcat启动脚本中加入JVM参数-XX:+HeapDumpOnOutOfMemoryError,使其OOM时把堆内存dump成文件,便于分析。今天,程序又一次OOM,我拿到了这个4G大的java_pid32693.hprof文件。打开 Eclipse Memory Analyzer(MAT),导入dump文件,内存泄露问题显而易见:
 Eclipse Memory Analyzer

根据提示,有一个ConcurrentHashMap数据结构占用了所有4G内存中的99.11%,该Map名为asyncContentMap,后面是它的package路径。

回忆了一下,这个貌似是很早以前实现过的一个实例状态自动刷新功能,管理平台有一个实例列表页面,当实例状态变化时,无需用户刷新页面,状态栏自动变更。为了前端通用,放弃Websocket、socket.io,用了最简单的ajax实现,浏览器端和管理平台间用long polling实现状态推送。下图就是整个状态自动刷新功能的示意图,管理平台接到请求后,使用Servlet 3.0的异步Servlet挂住浏览器端连接,并将请求上下文(AsyncContext)放入队列。另一方面,通过worker线程池处理请求队列里的请求,worker线程做的事情就是依次轮询后端服务(Service1,Service2...Service N)接口,判断其实例状态有无变化,如果状态变化,则取出前端请求上下文,返回响应。用户就能从浏览器得知实例状态变更,之后,再发起一个HTTP long polling请求,重复上述流程。当然,long polling有个超时时间,由管理平台控制,当对后端服务连续polling了一定时间都没有状态变化时,也要将响应返回给浏览器,使之进入下一个long polling周期。
Long Polling

要实现上述描述的功能,管理平台这边需维护两个状态,分别对应两个线程安全的数据结构。一个是请求队列LinkedBlockingQueue asyncContextList,worker线程池会从请求队列中取出浏览器请求;另一个是映射表ConcurrentHashMap asyncContentMap,用于存放异步请求和改请求附带的一些元信息,比如上一次polling后端Service的时间戳、响应值等等。根据MAT的提示asyncContentMap出现内存泄露,那么可以肯定是有些异步请求已经完成响应,但是忘了从映射表里把记录清除。检查了一下程序,的确如此,加入remove操作后问题解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
AsyncContext asyncContext = Container.asyncContextList.take();
AsyncContentDeliver asyncDeliver = Container.asyncContentMap.get(asyncContext);
 
//略去其余部分
 
if (respEntry.getResponseBody().equals(lastResponseBody)) {
  // 与上一次polling相比,实例状态未发生改变,异步请求重新入队。
  Container.asyncContextList.put(asyncContext);
  continue;
} else {// 实例状态发生了改变,进行响应。
  try {
    asyncContext.getResponse().setCharacterEncoding("UTF-8");
    asyncContext.getResponse().getWriter().print(respEntry.getResponseBody());
    asyncContext.complete();
    Container.asyncContentMap.remove(asyncContext); // BugFix!
  } catch (IOException e) {
    e.printStackTrace();
    Container.asyncContextList.put(asyncContext);
  }
}

总结这次Bug修复,深深感受到一点:编程语言始终解决不了业务逻辑层面上的内存泄露问题,资源的分配和释放程序员必须谨记在心。

--EOF--

Spring @Async

Spring @Async注解可以将方法异步化处理。与『如何用Spring实现集群环境下的定时任务』提到的@Scheduled注解一样,@Async注解也需要task命名空间驱动,需要配置一个线程池。

一个被@Async注解的异步方法,它的返回值只有两种,要么void,要么返回一个Future类型,Future类型用于追踪异步方法的执行结果。Spring提供了一个Future类型的实现类AsyncResult。

比如以下场景:一个创建实例的HTTP请求,该实例由多个相同的资源组成,这些资源的创建非常耗时,但是相互间没有依赖性,资源创建完成后还要进行一些数据库操作记录实例元信息。这个场景非常适合用@Async注解+Future返回值实现。用@Async注解标注创建实例的Service方法,使得创建实例的过程放在了一个独立线程中执行,此时HTTP请求可以直接响应用户(202 Accepted),并且实例的Location(URI)也可以同时返回。为了进一步提升用户体验,减少实例创建的等待时间,可以将资源的创建分配到多个线程中完成。资源创建的结果(true or false)放进AsyncResult中返回,此处如果不用Future返回值对异步创建资源的过程进行追踪,创建实例的主线程就没法确定资源是否创建成功,则资源的创建只能在主线程中依次进行。

这个过程的大致代码如下:
1. Controller类,接收用户请求。

1
2
3
4
5
6
7
8
9
@Controller
public class InstanceController {
    @RequestMapping(value = "instances", method = RequestMethod.POST)
    public String createInstance(HttpServletResponse response, ModelMap model) {
        //doSth().
        instanceTaskService.createInstance(instance);
        return doResponse(201);
    }
}

2. Instance Service类。异步处理实例创建过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class InstanceTaskService {
    @Async
    public void createInstance(String instanceId) {
        //doSth().
        List<Future<Boolean>> futureList = new ArrayList<Future<Boolean>>();
        for (Resource res : resourceList) {
            // 将每个资源的异步执行结果放入列表。
            futureList.add(resourceTaskService.createResource(res.getId()));
        }
 
        for (Future<Boolean> future : futureList) {
            try {
                //超时判断,避免无限期等待。
                Boolean result = future.get(10, TimeUnit.MINUTES);
                if (!result) {
                    // success.
                } else {
                    throw new Exception("create instance error.");
                }
            } catch (Exception e) {
                rollback();
            }
        }
    }
}

3. Resouce Service类,异步处理资源创建过程。

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class ResouceTaskService {
    @Async
    public Future<Boolean> createRescource(String resId) {
        //doSth().
        if(success()){
            return new AsyncResult<Boolean>(true);
        else{
            return new AsyncResult<Boolean>(false);
        }
    }
}

关键步骤在于createInstance()方法中对每个资源调用future.get()方法,Future类的get()方法是阻塞的:当异步方法已经返回时,调用get()方法立即返回;当异步方法还在执行中时,调用get()方法会被阻塞,直到异步方法执行结束,get(long timeout, TimeUnit unit)方法是get()方法的重载版本,可以指定阻塞的最长时间。示例程序中,创建资源可以并发进行,createInstance()方法中等待所有资源创建成功的时间就是单个资源创建成功的最长时间。

另外有一个需要注意的地方,示例程序中把createResource()方法和createInstance()方法放在了不同的类中,这么做不仅仅是因为代码风格和可读性,而是因为,一个@Async注解的方法,它只能被非本类方法调用时Spring才将其转变为异步方法执行,如果它被本类方法调用,那么无法转变为异步执行。原因在于:@Async注解的实现基于Spring AOP,『Spring AOP实践小结』一文中已有提及,Spring AOP的实现原理是Java动态代理,也就是说Spring会为带@Async注解的方法所在类生成一个代理类,当该方法被调用时,Spring将代理类放到线程池中运行。而发生在本类中的方法调用,则属于目标类的内部调用,代理类无法再参与进去进行异步处理了。

--EOF--