标签归档:数据库

如何用Spring实现集群环境下的定时任务

定时任务的实现方式有多种,例如JDK自带的Timer+TimerTask方式,Spring 3.0以后的调度任务(Scheduled Task),Quartz等。

Timer+TimerTask是最基本的解决方案,但是比较远古了,这里不再讨论。Spring自带的Scheduled Task是一个轻量级的定时任务调度器,支持固定时间(支持cron表达式)和固定时间间隔调度任务,支持线程池管理。以上两种方式有一个共同的缺点,那就是应用服务器集群下会出现任务多次被调度执行的情况,因为集群的节点之间是不会共享任务信息的,每个节点上的任务都会按时执行。Quartz是一个功能完善的任务调度框架,特别牛叉的是它支持集群环境下的任务调度,当然代价也很大,需要将任务调度状态序列化到数据库。Quartz框架需要10多张表协同,配置繁多,令人望而却步...

经过折中考虑,还是选择了Spring的Scheduled Task来实现定时任务。如下:

1. Spring配置文件application-context.xml中添加task命名空间和描述。

1
2
3
4
5
6
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/task
	http://www.springframework.org/schema/task/spring-task.xsd">

2. 添加调度器和线程池声明。

1
2
<task:executor id="taskExecutor" pool-size="10" />
<task:annotation-driven executor="taskExecutor" />

3. 实现调度方法。基本结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
package com.netease.yx.service;
 
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
 
@Service
public class ScheduledService {
    @Scheduled(cron = "0 0 5 * * *")
    public void build() {
       System.out.println("Scheduled Task");
    }
}

@Scheduled注解支持秒级的cron表达式,上述声明表示每天5点执行build任务。

前文已经提过,这种方式在单台应用服务器上运行没有问题,但是在集群环境下,会造成build任务在5点的时候运行多次,遗憾的是,Scheduled Task在框架层面没有相应的解决方案,只能靠程序员在应用级别进行控制。

如何控制?

1. 无非是一个任务互斥访问的问题,声明一把全局的“锁”作为互斥量,哪个应用服务器拿到这把“锁”,就有执行任务的权利,未拿到“锁”的应用服务器不进行任何任务相关的操作。
2.这把“锁”最好还能在下次任务执行时间点前失效。

在项目中我将这个互斥量放在了redis缓存里,1小时过期,这个过期时间是由任务调度的间隔时间决定的,只要小于两次任务执行时间差,大于集群间应用服务器的时间差即可。

完整定时任务类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.netease.yx.service;
 
import javax.annotation.Resource;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.netease.yx.service.ICacheService;
 
@Service
public class ScheduledService {
    @Resource
    private ICacheService cache = null;
 
    private static String CACHE_LOCK = "cache_lock";
 
    private static int EXPIRE_PERIOD = (int)DateUtils.MILLIS_PER_HOUR / 1000;
 
    @Scheduled(cron = "0 0 5 * * *")
    public void build() {
        if (cache.get(CACHE_LOCK) == null) {
            cache.set(CACHE_LOCK, true, EXPIRE_PERIOD);
            doJob();
        }
    }
}

--EOF--

一种"无状态"的竞猜游戏设计

场景如下:某个电视节目,共6个选手,每期节目有一个或多个选手为获胜者。现要给这个节目开发一个与节目同步的竞猜游戏,观众参与竞猜谁是节目的获胜者,答对的有红包奖励。节目每周一期,周六21点播出,22:30结束。周四10点至周六22点为观众竞猜时间,周六22点至22:30为等待开奖时间,周六22:30至下个周四10点为领奖时间。然后下个节目周期开始,依此循环。

上述节目有三个时间点,开始竞猜时间(结束领奖时间,周四10点),停止竞猜时间(周六22点),开奖时间(周六22:30),这三个时间点前后会有较大的访问量,其余时间则较为平缓。一种设计方法是给每个用户保存一个状态,定时器在上述三个时间点修改用户的状态,这就是一种有状态的实现方式,优点是直接,用户访问页面后查一次数据库即可知道状态,显示对应的视图即可。缺点是三个时间点附近有大量的数据库读写,特别是开奖时间前后会涌入大量用户,应用服务器一边要等待数据库IO,处理缓存更新,一边要处理用户请求,会给响应时间带来一定影响。而且定时更新状态的任务在应用服务器集群下还要附加额外逻辑,避免不同服务器更新多次用户状态。

另外有一种无状态的设计方式,这里的无状态是指不用将用户的当前状态存储到数据库中。在用户访问应用之前,他的状态是未知的,当用户访问页面后,服务器根据一些参照值实时运算出该用户的当前状态。这种实现方式避免了第一种实现方式的两个缺点,首先三个时间点上不再有涉及全表的数据库操作,它将集中的状态更新操作平摊到每次用户访问页面时,其次应用支持简单的水平扩展。由于需要实时运算用户状态,这种实现方式会额外消耗CPU时间。从业务场景的特点来看,这种交换是值得的。

主要的实现方式比较简单:

1. 定义状态类型,定义以下8种状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 领奖期间的状态。周六22:30至下个周四10:00。
public static final int STATUS_PRIZE_NO_PARTI = 0; // 未参与
public static final int STATUS_PRIZE_PARTI_WIN_DRAW = 1; // 参与,压中,已领奖
public static final int STATUS_PRIZE_PARTI_WIN_NO_DRAW = 2; // 参与,压中,未领奖
public static final int STATUS_PRIZE_PARTI_NO_WIN = 3; // 参与,未压中
 
// 竞猜阶段的状态。周四10:00至周六22:00。
public static final int STATUS_GUESSING_CHOOSE = 4; // 已选择
public static final int STATUS_GUESSING_NO_CHOOSE = 5; // 未选择
 
// 节目播放阶段的状态。周六22:00至周六22:30。
public static final int STATUS_BROADCAST_CHOOSE = 6; // 已选择
public static final int STATUS_BROADCAST_NO_CHOOSE = 7; // 未选择

2. 用户访问页面后,以当期节目的播出时间为参照点,根据相对时间进行运算,确定3个时间点。确定当前时间所在的区间,再根据其他信息确定当前用户的状态,返回即可。

--EOF--

DBCP预检测连接有效性

今天碰到一个SQL查询操作,后端MySQL驱动抛出一个CommunicationsException的异常:

1
2
3
4
5
6
7
8
9
10
11
org.springframework.dao.RecoverableDataAccessException: 
### Error querying database.  
Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: 
The last packet successfully received from the server 
was 57,695,440 milliseconds ago.  
The last packet sent successfully to the server was 57,695,441 milliseconds ago. 
is longer than the server configured value of 'wait_timeout'. 
You should consider either expiring and/or testing connection validity 
before use in your application, 
increasing the server configured values for client timeouts, or using 
the Connector/J connection property 'autoReconnect=true' to avoid this problem.

看这个提示很眼熟,以前也碰到过,起因是DBCP连接池在执行SQL语句时未验证连接有效性。本文就此问题的相关知识和解决方法做个总结吧。

数据库为提升利用率一般会回收长时间空闲的连接。MySQL中默认这个时间为28800s,也就是8小时,由wait_timeout参数指定:

1
show global variables like 'wait_timeout';

DBCP连接池的基本思路一次性创建多个数据库连接,当上层应用需要执行SQL时,从连接池中取出一条空闲连接,执行完之后将连接归还连接池,这样避免了数据库连接的重复创建和销毁。当连接池中的连接长时间处于空闲状态时,连接另一头的MySQL会根据设置的wait_timeout值大小回收这些连接,但是DBCP对连接被回收却不知情,因此,只要应用有请求,连接池就会把空闲连接分配出去,尽管这已是一条半闭合的连接,这就造成了通过这条连接执行的SQL语句运行失败,也就触发了数据库驱动层抛出CommunicationsException。

DBCP提供多种方式解决这一问题,有文章已经总结过了,其实最简单、最有效的方法就是利用DBCP提供的validationQuery参数进行连接的预检测,它会在与连接池交互的过程中加入一些钩子,定点执行validationQuery指定的SQL语句,如果SQL语句执行成功,表示此连接有效,分配给应用,如果执行失败,则丢弃此连接,这种方法还能应对网络故障等问题造成的MySQL连接失效问题。其他的一些方法,比如空闲连接检测,乐观获取连接等方式,都无法保证完全对应用透明,应用还是能感知到数据库操作失败。

DBCP配置项中,与validationQuery相关的有validationQuery,testOnBorrow,testOnReturn,testWhileIdle等几个,详见官方文档。validationQuery要求必需是个SELECT类型的SQL语句,至少返回一行,由于它会在所有应用的SQL语句执行之前运行一次,所以原则上应该对数据库服务器带来的压力越小越好,在不同类型的数据库中有不同的推荐值,这里有篇文章对此进行了总结,MySQL推荐使用“SELECT 1”。testXXX系列的配置参数用来指定运行validationQuery的时机,比如testOnBorrow设为true表示从连接池中获取连接前运行validationQuery,testOnReturn设为true表示将连接归还连接池前运行validationQuery,testWhileIdle要与timeBetweenEvictionRunsMillis、numTestsPerEvictionRun等参数配合使用,只要设置得当,DBCP会定期对连接池中的空闲连接进行有效性验证。默认情况下,testOnBorrow设置为true,testOnReturn和testWhileIdle设置为false。因此,支持数据库连接预检测的Spring DBCP连接池参数可以简化为以下配置:

1
2
3
4
5
6
7
8
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
    destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://10.10.120.16:3306/db" />
    <property name="username" value="username" />
    <property name="password" value="password" />
    <property name="validationQuery" value="SELECT 1" />
</bean>

关于使用validationQuery参数来预检测连接有效性的缺陷,前文已有提及,它肯定会带来数据库使用效率的降低,特别是大量短连接的场景中。一般来说,局域网中执行一条“SELECT 1”的耗时约为1~5ms,如果能接受这种程度的损耗,那么推荐使用预检测连接的方式来解决连接池中连接失效的问题,它很粗暴但是最有效。

--EOF--

一种缓存服务器宕机的应对之术

假如有一个认证服务应用,作为基础服务需要尽量保证高可用状态,应用服务器层面已经通过Nginx+keepalived实现主备热切换,但是在内部服务中,由于考虑复杂度等因素,缓存服务器和数据库服务器都是单节点的,也就是说多个认证服务应用(Tomcat)共用一个memcached缓存服务器和数据库服务器。

认证服务器的作用是接收用户请求及附带的key、StringToSign、摘要信息等参数,然后判定此请求是否合法。为了TPS能得到保障,认证服务器会将验证签名用的用户信息缓存在memcached里,避免每次从数据库中读取。

程序的流程可以很简单,分三步走:
1. 从缓存中读取数据。如果读到数据,立即返回。否则,进到第2步。
2. 从数据库中读取数据。
3. 将数据写入缓存。返回。

如果缓存服务器运行正常,这个过程没什么问题。但是一旦单节点的缓存宕掉,而服务的访问量又很高时,上面流程就会出问题了。最坏情况下,三个步骤中有两个步骤要跟缓存打交道。

因为,程序会卡在第一步从缓存读取数据,直到抛出读缓存超时异常,这个时间可短可长,但不管怎样TPS都会急剧恶化。这还没完,程序在第二步从数据库读出数据后,进行了一次写缓存的操作,一样的原因,程序会等到超时后返回。这一来性能还远不如直接从数据库中读取数据。实际上,即使不使用缓存,数据库也能撑住这些查询请求,只是TPS会下降%50左右,不会造成缓存雪崩。

其实经过简单的优化,上述过程就可以变得合理。至少,第二次写缓存操作可以省略。另外,当缓存出问题时,可以认为较短的时间内它都不会恢复。因此,我采用的策略是:

程序中维护一个上次缓存服务器故障时间戳的全局变量,同时指定一个缓存故障恢复期。每次在读写操作缓存前,都判断一下当前时间戳减去上次缓存服务器故障时间戳是否大于缓存故障恢复期,如果小于缓存故障恢复期,则认为当前缓存服务器不可用,程序直接绕过缓存从数据库中读取数据,否则认为缓存已经从故障中恢复,走正常流程。不管缓存服务器多久恢复,程序的消耗仅仅在于每经过一个缓存故障恢复期,都会尝试操作缓存一次。同时,每次捕捉到操作缓存的异常时,都更新一下上次缓存服务器故障时间戳。在我的程序里,设置了缓存故障恢复期为5分钟。基本上,这个处理已经将缓存服务器宕机带来的影响降到最低。

以下是Java代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//上次缓存服务器故障时间戳,默认为0。
public static long LAST_CACHED_EXCEPTION_TIMESTAMP = 0;
//缓存故障恢复时间
public static final long CACHED_EXCEPTION_RETRY_PERIOD = 5 * 60 * 1000;//5分钟
 
try {
    //不在缓存故障恢复期内才操作缓存。
    if (System.currentTimeMillis() - LAST_CACHED_EXCEPTION_TIMESTAMP >=
        CACHED_EXCEPTION_RETRY_PERIOD) {
        // 先尝试从cache中读取信息。
        user = getObjectFromCache(cacheKey);
    }
} catch (Exception e) {
    //更新上次缓存服务器故障时间戳。
    LAST_CACHED_EXCEPTION_TIMESTAMP = System.currentTimeMillis();
    e.printStackTrace();
}
 
if (user != null) {
    // cache中已存在该信息,返回。
    return user;  
} else {
    // cache中不存在该信息,先从数据库读,然后存入cache。
    user = getObjectFromDb();
 
    //不在缓存故障恢复期内才操作缓存。
    if (user != null
      && (System.currentTimeMillis() - LAST_CACHED_EXCEPTION_TIMESTAMP >= 
           CACHED_EXCEPTION_RETRY_PERIOD)) {
        try {
            addToCache(cacheKey, user);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
return credential;
}

--EOF--

几种提高数据库性能的改进措施

1. 使用缓存产品。将应用程序请求的热点数据放入缓存,减少数据库IO,对于需要频繁更新的操作(例如微博的关注人数、粉丝数、微博数等),可以进行异步操作,读写更新都放在缓存中,定时持久化到数据库中。此外,还可以根据具体的业务需求,选择合适的缓存类型,比如行缓存和页缓存的适用场景有很大的差异。

2. 建立高效的索引。例如通过一个字段查询另一些字段的业务需求,可以通过索引覆盖技术,直接从索引中得到查询结果。

3. 用CPU换IO。对于某些应用数据量大,但查询简单,导致IO负载高而CPU负载轻的场景(UGC网站),通过数据精简和压缩,降低IO。

4. 在应用程序的层面上优化SQL语句。例如mysql的order by rand语法,mysql的实现导致该语法在大数据量下非常低效,此时可以通过应用程序的一些变通方法实现,达到同样的效果。

--EOF--