分类目录归档:算法思想

基于Redis的分布式锁的简易实现

本文讨论的分布式锁适用于集群节点程序的互斥执行,注意此处的“分布式”是指该锁的适用场景是分布式系统,而非锁本身是分布式的,关于锁本身的分布式设计可以参考Redis官方介绍:『Distributed locks with Redis』。

之前在『如何用Spring实现集群环境下的定时任务』一文中提到利用Redis的"GET/SET+TTL"机制实现集群场景下定时任务的互斥执行功能,这实际上就是本文实现的原型。该文描述的方法的问题在于GET和SET方法的组合并非原子操作,在多进程并行执行场景下可能有多个客户端获得锁,从而破坏了锁的安全性。

本文的改进在两个方面:

1. 要解决分布式锁的安全性问题,需要使用Redis提供的锁原语:SETNX(since 1.0.0)或者SET NX EX(since 2.6.12)。这类命令的语义是:如果Key已存在,则返回SET成功,否则返回失败。

2. 使用注解方式加锁和解锁,避免代码重复和耦合。

以下是Java实现(Spring AOP)的原型:

1. 定义注解类,接收expire参数,表示此锁的过期时间。如果在定时任务中使用,一般要大于节点间的时间差,小于定时任务的时间间隔。

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface DLock {
    public String value() default "";

    public String expire() default "30";
}

2. 定义切面类,完成锁的申请和释放逻辑。

@Aspect
@Component
public class DLockAdvice {
    private static String DLOCK_PREFIX = "DLOCK_PREFIX_";

    @Autowired
    private RedisService redisService = null;

    @Around("@annotation(lock)")
    public Object lock(ProceedingJoinPoint pjp, DLock lock) throws Throwable {
        Long expire = Long.valueOf(lock.expire());;
        Signature sig = pjp.getSignature();
        String lockKey = DLOCK_PREFIX + sig.getDeclaringTypeName() + sig.getName() + expire;
        if (redisService.setValueIfAbsent(lockKey, true, expire)) {
            return pjp.proceed();
        } 
        return null;
    }
}

考虑到注解的通用性,锁名称的区分度越大越好,此处采用的“前缀+包名+类名+方法名+过期时间”,因此假如有个方法通过参数个数或者类型不同进行重载,则该锁会对这个重载的每个方法都生效,除非把参数个数或者类型信息加到锁名称里。

另外考虑到Redis服务本身或者网络的不稳定性,需要在RedisService的setValueIfAbsent()方法中对异常进行处理,假如:

1. 能够容忍多个客户端同时获得锁。那么当执行Redis命令异常时返回true。
2. 无法容忍多个客户端同时获得锁,宁愿没有客户端可以获得锁。那么当执行Redis命令异常时返回false。

--EOF--

整数的可变字节编码压缩算法

在各个平台中整数占用的字节数一直比较固定,通常是4个字节。它的表示的整数范围是-2147483648~2147483647。然而对于一些数值较小的整数,因为有大量的位数是前导0,这些比特在数值的表示中是没有意义的,仍旧花费4个字节去存储则显得有些浪费。这里的一篇文章『Variable byte codesd』,讲述正整数的可变字节编码的压缩,它可以在需要存储大量正整数的情况下有着较为实际的应用。

正整数可变字节编码压缩算法的思路是:
将每个字节分为2个部分:低7位为负载位(payload),用于存储数值,最高位为标志位(continuation bit),取值0或者1,用于标识当前字节是否是该整数在可变字节编码中的最后一个字节。

例如正整数130,它的二进制表示(4字节,共32位)为:00000000 00000000 00000000 10000010。经过可变字节编码压缩之后,130可压缩为2个字节:
Byte0: 0 0000010
Byte1: 1 0000001
Byte0的最高位为0,表示该字节并不是最后一个字节,低7位存储原比特流中的低7位。Byte1的最高位为1,表示该字节已经是最后一个字节,低7为存储原比特流中的第8位-第14位。舍弃原比特流中的所有前导0。

整数可变字节编码压缩算法的Java实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * 对正整数列表进行可变字节编码,返回压缩后的字节数组。
 * @param intList
 * @return
 */
private static List<Byte> intToByte(List<Integer> intList) {
    List<Byte> list = new ArrayList<Byte>();
    if (intList == null) {
        return null;
    }
    for (int n : intList) {
        //遍历列表中的每个正整数。
        while (n > 0) {
            int byteOf = n % 128; //得到正整数的低7位。
            if (n < 128) {
                //如果n值已经能由7位表示,则该字节是最后一个字节。
                byteOf += 128; //将该字节的最高位置为1。
                list.add((byte) byteOf);
                break; //当前整数的可变字节编码结束。
            } else {
                list.add((byte) byteOf);
            }
            n /= 128;
        }
    }
    return list;
}
 
/**
 * 将可变字节编码形式存储的字节数组还原为正整数列表。
 * @param byteList
 * @return
 */
private static List<Integer> byteToInt(List<Byte> byteList) {
    List<Integer> list = new ArrayList<Integer>();
    int n = 0;
    int byteStartPerInt = 0;
    for (int i = 0; i < byteList.size(); i++) {
        //依次读取字节数组。
        if (byteList.get(i) >= 0) {
            //如果当前字节的值大于0,则表示最高位是0,该字节不是最后的字节。
            n += byteList.get(i) * Math.pow(128, i - byteStartPerInt);
        } else {
            //如果当前字节的值小于0,则表示最高位是1,该字节是最后的字节。
            n += (byteList.get(i) + 128) * Math.pow(128, i - byteStartPerInt);
            list.add(n);
            n = 0;
            byteStartPerInt = i + 1;
        }
    }
    return list;
}

经过简单的测试,随机生成1w个值为1-105之间的正整数,上述算法的压缩比约为0.29。也就是说,本来需要4w个字节存储的整数现在只需要2.8w左右的存储空间。随着测试数据的值越大,压缩率会变小,而当测试数据的值越小,压缩率会变大。极端情况下,上述算法的压缩率最大能达到75%。

由于可变字节编码压缩算法是建立在标识位的基础上的,因此,当待编码的正整数大于1284时(针对4个字节表示整数的情况),算法会失去作用,非但不能压缩,反而会引起数据膨胀,这也是该算法的最大缺陷。另外,如果对待编码正整数的顺序没有要求的话,可以先对整数列表排序,然后存储相邻两个正整数之间的差值,通过这样的操作之后,待编码的整数就以“差值”的形式变小了,从而提高压缩率。

--EOF--

Base64编码算法

严格意义上,Base64编码不算是一种加解密算法,它的主要作用是能用可打印字符表示二进制数据,因此在mime email和xml中都较通用。虽然经Base64编码后的字符串看似无规则,实际上只要知道了算法就能轻易解码/解密。

Base64编码是将源字符串以3个字节为单位,将这24bit分成4组,每组6bit。再在每组6bit数据左侧补2bit的0,凑足8bit,即1个字节。经过这样的变换,原先3个字节(24bit)的数据就被编码成了4个字节(32bit)的数据。然后以这4个字节的值为索引,分别去字符对照表中取出相应的字符即完成编码。编码后每个字节的值不会大于63,字符对照表索引0-25为大写英文字母,26-51为小写英文字母,52-61为数字0-9,62为'+',63为'/'。由此可见, Base64的编码算法是完全可逆的,一个编码后的字符串依次进行上述过程的逆操作就能实现解码。

对于源字符串不足3个字节的情况,Base64编码要经过两个步骤的补位操作,第一步是当目前bit数模6不等于0时,则在其右端补0,直到模6等于0为止。第二种是在第一步基础上,如果经过补0后的bit数除6小于4,那么小几个就补几个等号('=')。根据这个规则,一个Base64编码后的字符串最多只会在其末尾包含2个等号。因为最坏情况下,当源字符串只包含1个字节(8bit)时,首先会在其右端补4bit的0,凑足12bit(达到6的倍数),12除以6等于2,故需要填充4-2=2个等号。最好情况下,当源字符串的字节数正好是3个倍数时,Base64编码的冗余率为33%,其余情况下,冗余率会稍稍大于这个数字。

此外,RFC2045还规定,编码后的字符串每76个字符就要换行,换行是指每76个字符就要加回车换行符(\r\n)。最后一行即使未满76个字符也要加回车换行符。

下面以一个简单的例子说明:
字符串"AB"的ASCII码值为65和66,换成二进制是0100,0001和0100,0010。共16bit,不满足模6等于0,故需要在其右侧补2个0。
经右侧补0后的bit流如下:010000010100001000。
现以6个bit为一组分成3组,如下:010000,010100,001000。再在每个分组左侧补0凑足8bit,经左侧补0后形成的3个字节的值分别为16,20和8,对应着字符对照表中的Q,U和I。因此,"AB"经Base64编码后的字符串为QUI=,外加一个回车换行符\r\n。

对于非ASCII字符来说,一个字符可能是由2-4个字节构成,例如UTF-8或者GBK编码下的中文。要计算它们的Base64编码,只需获取该编码方式下字符的字节数组,然后按字节对其进行Base64编码。

Java中可调用org.apache.commons.codec.binary.Base64类的encode和decode方法来对字符串进行编码和解码操作。

--EOF--

单链表快速排序算法

快速排序的算法思想是:确定一个枢轴元素,通过一趟排序将待排关键字分成两个部分,其中一部分元素均比枢轴元素小,另一部分元素均比枢轴元素大,这样的一趟排序称为快速排序的一次划分。再对两个部分分别再递归进行划分操作,最后获得的序列就是排好序的关键字序列。可见,快速排序的性能与枢轴元素的选取有关,快排的时间复杂度在最好、最坏、平均的情况下分别为:O(nlog2n)、O(n2)和O(nlog2n),空间复杂度在最好、最坏、平均情况下分别为:O(log2n)、O(n)和O(log2n),快排的C++实现和效率分析见『六种基本内部排序算法的C++实现』『六种基本内部排序算法效率分析』

上述快排算法的实现中采用的存储结构是数组,采用数组有一个好处,就是待排元素可以随机存取。而以单链表方式存储的待排序列就无法随机存取元素了,其实根据快排思想,稍作改动,对单链表的快速排序算法也能实现。

因为快排最核心的思想就是划分,确定一个枢轴元素(pivot),每一趟划分的目的就是把待排序列分为两部分,前一部分比枢轴小(序列A),后一部分比枢轴大(序列B)。经过一趟划分之后序列变为:{A} pivot {B}。以下是具体步骤:
1、确定每一次划分的枢轴元素为当前待排序列的头节点。
2、设置Slow和Fast两个游标,Slow指向序列A中的最后一个元素,初始化为枢轴本身(待排序列头节点)。让Fast遍历一遍待排序列,当所指元素比枢轴小时,将Slow往前游一格,交换Slow和Fast所指元素的值,这样仍能保证Slow指向的元素是序列A中的最后一个元素。
3、交换Slow所指元素和枢轴元素的值。
4、对序列A和B重复步骤1~4。

下面是单链表快速排序算法的Java实现:
1、单链表节点的定义。

1
2
3
4
5
/* 单链表节点 */
class Element {
    public int     data;
    public Element next;
}

2、单链表的快排算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * @param ListHead 待排序列的头节点
 * @param ListEnd 待排序列的尾节点
 */
public static void qsort(Element ListHead, Element ListEnd) {
    if (ListHead == null || ListEnd == null)
        return;
    if (ListHead == ListEnd) {
        return;
    }
    //Slow游标,指向序列A的最末尾元素。
    Element Slow = ListHead;
    //Fast游标,用于遍历整个待排序列。
    Element Fast = ListHead.next;
    //TempEnd游标,总是指向Slow游标的前驱节点,递归调用时需要。
    Element TempEnd = ListHead;
    int temp;
    while (Fast != null) {
        //当前节点的值比枢轴小,进行交换。
        if (Fast.data < ListHead.data) {
            //TempEnd游标总是指向Slow的前驱。
            TempEnd = Slow;
            Slow = Slow.next;
 
            //交换Slow和Fast游标所指的元素的值
            temp = Slow.data;
            Slow.data = Fast.data;
            Fast.data = temp;
        }
        Fast = Fast.next;
    }
 
    //交换Slow游标所指的元素和枢轴元素的值,使序列成为{A} povit {B}形式
    temp = ListHead.data;
    ListHead.data = Slow.data;
    Slow.data = temp;
 
    //递归调用
    qsort(ListHead, TempEnd);
    qsort(Slow.next, ListEnd);
}

--EOF--

Fibonacci数列的O(log2n)解法

Fibonacci数列(斐波那契数列)用递归实现最符合其定义要求,但是计算机实现起来的时间复杂度为O(2n)。改进一点的做法是通过一个O(n)的空间换取时间复杂度为O(n)的非递归计算方法。更好一些的方法就是通过Fibonacci数列的递归公式构建出一个矩阵方程,通过对矩阵方程的代数运算,可以在O(log2n)时间复杂度内得到任意第n个Fibonacci数列中数字。

由Fibonacci数列公式F(n+1) = F(n) + F(n-1),我们可以构建出下列的矩阵方程:

由此可以推导出:

因为F(0)=1,F(1)=1是已知的,所以只要求出矩阵(1, 1, 1, 0)n的值,那么F(n)值也就知道了。

对于两个数的乘方,我们可以用分而治之在O(log2n)的时间复杂度内求得,将它运用在矩阵计算中也是一样的。如下:

程序中只需先定义两个矩阵的乘法,然后便可以递归求得Mn的值。

1、首先定义矩阵类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
 * | m00 m01 |
 * | m10 m11 |
 */
class Matrix {
    public long m00;
    public long m01;
    public long m10;
    public long m11;
 
    public Matrix(long m00, long m01, long m10, long m11) {
        this.m00 = m00;
        this.m01 = m01;
        this.m10 = m10;
        this.m11 = m11;
    }
}

2、定义矩阵乘法

1
2
3
4
5
6
7
public static Matrix multiplyMatrix(Matrix m1, Matrix m2) {
    return new Matrix(
        m1.m00 * m2.m00 + m1.m01 * m2.m10, 
        m1.m00 * m2.m01 + m1.m01 * m2.m11,
        m1.m10 * m2.m00 + m1.m11 * m2.m10, 
        m1.m10 * m2.m01 + m1.m11 * m2.m11);
}

3、对矩阵M进行n次幂运算

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Matrix calculate(int n) {
    if (n == 1) {
        return new Matrix(1, 1, 1, 0);
    }
    if (n % 2 == 0) { //n为偶数
        Matrix m = calculate(n / 2);
        return multiplyMatrix(m, m);
    } else { //n为奇数
        Matrix m = calculate((n - 1) / 2);
        m = multiplyMatrix(m, m);
        return multiplyMatrix(m, new Matrix(1, 1, 1, 0));
    }
}

这样,求解第n个Fabonacci数列中数字的问题就被转化成为矩阵(1, 1, 1, 0)的n次幂运算,时间复杂度降为O(log2n)。

完整代码参见这里

Reference:
[1] Fibonacci Sequence. Wikipedia. 2011-10-21 accessed.

--EOF--