月度归档:2013年02月

利用JDK实现观察者模式

JDK自1.0开始就支持观察者模式,java.util包提供了观察者模式结构中的Subject(被观察者,目标)和Observer(观察者)的定义和实现,分别对应java.util.Observable类和java.util.Observer接口。

Observable类已实现的方法包括注册观察者addObserver、移除观察者deleteObserver、设置状态改变标记setChanged,以及通知观察者类刷新数据的notifyObservers方法。Observer接口声明了update方法,当接到通知时用于刷新数据。

以下是利用Observable类和Observer接口实现的观察者模式:
1. 观察者ConcreteObserver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package cn.edu.zju.dp.observer;
 
import java.util.Observable;
import java.util.Observer;
 
/**
 * @author Feng
 * @version $Id: ConcreteObserver.java, v 1.0 2013-2-28 下午07:54:56
 */
public class ConcreteObserver implements Observer {
 
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("Observerable: " + ((ConcreteObservable) o).getName() 
                + ", Params: " + arg);
    }
}

2. 被观察者(目标)ConcreteObservable类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.edu.zju.dp.observer;
 
import java.util.Observable;
 
/**
 * @author Feng
 * @version $Id: ConcreteObservable.java, v 1.0 2013-2-28 下午07:58:00
 */
public class ConcreteObservable extends Observable {
    private String name;
 
    public String getName() {
        return name;
    }
 
    public void setName1(String name) {
        this.name = name;
        setChanged(); //设置状态变更标记
        notifyObservers(); //通知所有观察者。
    }
 
    public void setName2(String name) {
        this.name = name;
        setChanged(); //设置状态变更标记
        notifyObservers(name);//通知所有观察者,同时向所有观察者传递自定义消息。
    }
}

3. 测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package cn.edu.zju.dp.observer;
 
/**
 * @author Feng
 * @version $Id: ObserverTest.java, v 1.0 2013-2-28 下午08:01:29
 */
public class ObserverTest {
 
    public static void main(String[] args) {
        ConcreteObserver observer = new ConcreteObserver();
        ConcreteObservable observable = new ConcreteObservable();
        //注册观察者,当observable状态更新时,observer会收到通知。
        observable.addObserver(observer);
 
        observable.setName1("foo");//输出:Observerable: foo, Params: null
        observable.setName2("bar");//输出:Observerable: bar, Params: bar
    }
}

运行结果如下:

1
2
Observerable: foo, Params: null
Observerable: bar, Params: bar

被观察者ConcreteObservable类数据更新(setName)后,调用setChanged设置内部标记,表示状态更新(此时调用基类的hasChanged方法返回true),同时调用notifyObservers方法通知注册过的观察者。观察者类收到通知后,调用update方法刷新数据。update方法有类型分别为Observerable和Object的两个参数,前者是发出通知的被观察者(目标)类的引用,后者方便被观察者类向观察者发送通知时指定参数。可以理解为:Observerable参数支持观察者类从被观察者类“拉”数据,Object参数支持被观察者类向观察者类“推”数据。

JDK为什么把Observable设计成类,Observer设计成接口呢?知乎上有一个问答(『Java 的jdk中 为什么Observer 是接口,Observable 是类?』)靠谱:为所有被观察者对象设置内部状态变更标记,注册、移除、通知观察者的方法是相同的,将它们封装成类可以极大简化编写观察者模式程序,用户只需在Observable的子类中根据业务逻辑需要合理调用上述方法即可。而对于观察者而言,update方法中实现的是具体业务逻辑,例如可能刷新图表、更新表单数据、启动定时任务等,因此必须作为一个接口提供,由用户实现。

--EOF--

RabbitMQ Tutorials笔记

0. 基本概念:broke, consumer,producer,queue,exchange, bindKey, routeKey, ack.

1. channel.basicConsume(QUEUE_NAME, true, consumer)方法调用后,服务器端开始向消费者投递消息,投递消息是个异步过程,因此消费者需要提供一个回调临时队列,用于缓存消息,临时队列封装在QueueingConsumer对象中,以LinkedBlockingQueue类型存在,因此,如果不注意,服务器端会源源不断向消费者投递消息,直到挤爆内存。

1
2
3
4
5
6
7
8
9
10
LinkedBlockingQueue<QueueingConsumer.Delivery> bq = new 
    LinkedBlockingQueue<QueueingConsumer.Delivery>();
QueueingConsumer consumer = new QueueingConsumer(channel, bq);
channel.basicConsume(QUEUE_NAME, false, consumer);
 
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
}

2. 当一个队列上有多个消费者时,broke会以round-robin的方式按序投递消息,每个消费者得到平均数量的消息。

3. 默认情况下,broke会要求消费者显式对消息进行确认(channel.basicAck),broke收到ack消息后才从服务器端删去消息。RabbitMQ并没有对ack的时间进行限制,未做超时控制,这样能使得消费者有充足的时间处理消息。重发消息的唯一触发点是目标消费者连接断开,这时broke会重新选择一个消费者投递此消息。

4. 服务器端的消息可靠性需由队列持久化机制和消息持久化机制共同保障。队列持久化通过channel.queueDeclare方法的durable参数为true来实现。消息持久化通过channel.basicPublish方法的BasicProperties参数设置为MessageProperties.PERSISTENT_TEXT_PLAIN来实现。一条队列一旦声明完毕,就能不能再改变其属性,包括是否持久化durable、autoDelete, exclusive等参数。另外持久化参数并未保证消息100%写入磁盘,它可能只是写到磁盘缓存,在刷到磁盘之前服务器挂掉仍会造成消息丢失,确保消息完全可靠需配合生产者confirm机制。

5. channel.basicQos使得broke可以根据消费者负载投递消息,消费者指定basicQos参数,broke会根据参数值最多投递多少条消息过去,等消费者ack消息后继续投递,因此channel.basicQos需跟channel.basicAck配合使用,能解决round-robin算法带来的实际负载不均衡缺陷。

6. exchange负责路由,它从生产者端接收消息,根据策略(消息的routeKey和队列exchange之间的bindKey)将消息放入不同队列。消费者从队列消费消息。exchange有fanout, direct, topic等几种类型。fanout是广播类型,它忽略channel.basicPublish的routeKey参数,将消息投递到所有与自己bind的队列。direct是直接投递类型,需要消息的routeKey和队列bindKey严格匹配才能投递。topic类型介于两者之间,它要求routeKey和bindKey必须以点号分隔的字符串形式出现,其中bindKey和routeKey均支持*和#通配符,前者表示严格匹配一个单词,后者表示匹配0个或多个单词。当bindKey为#时,topic类型同fanout类型。当bindKey不含*和#通配符时,topic类型退化为direct类型。

7. 队列和exchange之间可以有多个bindKey,当一条消息的routeKey匹配多个bindKey时,同一个队列收到的消息仍是一条。

8. channel.basicPublish如果不指定队列名称,则broke默认会将消息投递到以routeKey命名的队列中,如果此队列不存在,则消息丢失。

9. 为了消息可靠性考虑,一般exchange和queue以及binding会在生产者和消费者两端一起声明。生产者声明queue为了保证消息发送到exchange后可以入队列,此时如果队列不存在,消息会丢弃。消费者声明exchange可以保证binding正确进行,若exchange不存在,binding异常退出。

--EOF--

『长尾理论』

长尾理论可以定义为:文化和经济重心正在加速转移,从需求曲线头部的少数大热门(主流产品和市场)转向需求曲线尾部的大量利基产品和市场。有一个问题的答案能体现长尾的价值,收录在Ecast公司曲库中的一万张专辑中有多少张能达到每一季度至少被点播一次的频率?答案不是什么20/80法则,而是98%。这个数字意味着,曲库中98%的专辑能带来直接经济收益,商家从长尾理论中获益有一个数学集合论上的原理支持:一个极大极大的数(长尾中的产品)乘以一个相对较小的数(每一种长尾产品的销量),仍然等于一个极大极大的数,而且这个极大极大的数只会变得越来越大。

在超市兴起之前,社会商业处在一个规模经济占主流地位的阶段,各个门店都只提供热门商品和主流商品的交易,因为规模经济是一个零和游戏:由于受限于库房和门面租金,非主流产品势必挤占主流产品的货品展示空间,降低利润收入。超市之类的大卖场出现之后,直至目前亚马逊、天猫等电子商务公司大行其道,规模经济在一定程度上向范围经济转变,因为种类越多,仓储物流成本越低,长尾价值开始体现。然而在网络上建立页面索引,货物存储在库房中,虽然能降低门面租金,但是存储长尾产品占用的库房资源仍然显得不划算。安德森认为,只有等电子书、数字音乐、数字电影等完全兴起,才是榨取长尾价值的最终手段,因为网络的空间是无限的,不再有任何关于货架、库房等物理空间的租金。所以真正体现长尾价值的经济模式是互联网经济。

长尾效应无处不在,不仅体现在产品之间,也体现在同一个产品的不同时间维度上。今天的热门必然会成为明天的长尾,这种相同产品的长尾价值也是值得挖掘的,线上线下的二手市场就是这类长尾效应套利的最佳场所。

--EOF--