Category Archives: 高并发

消息幂等(去重)通用解决方案

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

举个例子,一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。

然而这种可靠的特性导致,消息可能被多次地投递。举个例子,还是刚刚这个例子,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

这在RockectMQ的场景来看,就是同一个messageId的消息重复投递下来了。

基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。

简单的消息去重解决方案

例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:

1
2
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

1
2
3
4
5
6
7
select * from t_order where order_no = 'order123'

if(order  != null) {

    return ;//消息重复,直接返回

}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

并发重复消息

假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),那么很可能,上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完,还没成功更新订单状态),

那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等)

并发去重的解决方案之一

要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把select 改成 select for update语句,把记录进行锁定。

1
2
3
4
select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务
if(order.status != null) {
    return ;//消息重复,直接返回
}

但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。

当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。

但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 一个业务系统里面很大部分的请求处理都是依赖MQ的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。

Exactly Once

在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫”Exactly Once”,即消息肯定会被成功消费,并且只会被消费一次。以下是阿里云里对Exactly Once的解释:

Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是Exactly Once。

但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。

基于关系数据库事务插入消息表

假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:

1
update t_order set status = 'SUCCESS' where order_no= 'order123';

要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。

  1. 开启事务
  2. 插入消息表(处理好主键冲突的问题)
  3. 更新订单表(原消费逻辑)
  4. 提交事务

说明:

  1. 这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。
  2. 如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。

事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的。更多详情可参考:https://help.aliyun.com/document_detail/102777.html

基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表。

但是这里有它的局限性

  1. 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。
  2. 数据库的数据必须是在一个库,跨库无法解决

注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。

更复杂的业务场景

如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。

例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X):

  1. 检查库存(RPC)
  2. 锁库存(RPC)
  3. 开启事务,插入订单表(MySQL)
  4. 调用某些其他下游服务(RPC)
  5. 更新订单状态
  6. commit 事务(MySQL)

这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。

再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁。

那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?

拆解消息执行过程

其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:

  1. 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务
  2. 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费
  3. 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统
  4. 订单系统消费消息D:更新订单状态

注:上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。

可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。

然而,这太复杂了!这把一个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层面上加锁实现呢。

更通用的解决方案

上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。

如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。

例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?

基于消息幂等表的非事务方案

dedup-solution-01

以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的,而是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)

上面的流程不再细说,后文有github源码的地址,读者可以参考源码的实现,这里我们回头看看我们一开始想解决的问题是否解决了:

  1. 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  2. 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  3. 支持上游业务生产者重发的业务重复的消息幂等问题。

关于第一个问题已经很明显已经解决了,在此就不讨论了。

关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键),那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。

关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是messageId即可。所以也不是问题。

此方案是否有消息丢失的风险?

如果细心的读者可能会发现这里实际上是有逻辑漏洞的,问题出在上面聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。

有这种顾虑是正确的!对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序自行实现)。所以最后这个消息的流程会是这样的:

dedup-solution-01

更灵活的消息表存储媒介

我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:

  1. 性能上损耗更低
  2. 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

源码:RocketMQDedupListener

以上方案针对RocketMQ的Java实现已经开源放到Github中,具体的使用文档可以参考https://github.com/Jaskey/RocketMQDedupListener ,

以下仅贴一个Readme中利用Redis去重的使用样例,用以意业务中如果使用此工具加入消息去重幂等的是多么简单:

1
2
3
4
5
6
7
8
9
10
11
        //利用Redis做幂等表
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
        consumer.subscribe("TEST-TOPIC", "*");

        String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名
        StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程
        DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
        DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

        consumer.registerMessageListener(messageListener);
        consumer.start();

以上代码大部分是原始RocketMQ的必须代码,唯一需要修改的仅仅是创建一个DedupConcurrentListener示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。

更多使用详情请参考Github上的说明。

这种实现是否一劳永逸?

实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?

很可惜,其实不是的。原因很简单:因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程X:

  1. 检查库存(RPC)
  2. 锁库存(RPC)
  3. 开启事务,插入订单表(MySQL)
  4. 调用某些其他下游服务(RPC)
  5. 更新订单状态
  6. commit 事务(MySQL)

当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤1和步骤2就会重新再执行一遍。如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。

本实现方式的价值?

那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就大了!虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:

1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题

2.各种上游生产者导致的业务级别消息重复问题

3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑

一些其他的消息去重的建议

也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是rocketmq特性带来的重复。

事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

  1. 消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
  2. 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
  3. 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)
  4. 在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功。

Java concurrency (multi-threading)

Table of Contents
Java concurrency (multi-threading). This article describes how to do concurrent programming with Java. It covers the concepts of parallel programming, immutability, threads, the executor framework (thread pools), futures, callables CompletableFuture and the fork-join framework.

1. Concurrency

1.1. What is concurrency?

Concurrency is the ability to run several programs or several parts of a program in parallel. If a time consuming task can be performed asynchronously or in parallel, this improve the throughput and the interactivity of the program.

A modern computer has several CPU’s or several cores within one CPU. The ability to leverage these multi-cores can be the key for a successful high-volume application.

1.2. Process vs. threads

A process runs independently and isolated of other processes. It cannot directly access shared data in other processes. The resources of the process, e.g. memory and CPU time, are allocated to it via the operating system.

A thread is a so called lightweight process. It has its own call stack, but can access shared data of other threads in the same process. Every thread has its own memory cache. If a thread reads shared data it stores this data in its own memory cache. A thread can re-read the shared data.

A Java application runs by default in one process. Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior.

2. Improvements and issues with concurrency

2.1. Limits of concurrency gains

Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior. Concurrency promises to perform certain task faster as these tasks can be divided into subtasks and these subtasks can be executed in parallel. Of course the runtime is limited by parts of the task which can be performed in parallel.

The theoretical possible performance gain can be calculated by the following rule which is referred to as Amdahl’s Law.

If F is the percentage of the program which can not run in parallel and N is the number of processes, then the maximum performance gain is 1 / (F+ ((1-F)/n)).

2.2. Concurrency issues

Threads have their own call stack, but can also access shared data. Therefore you have two basic problems, visibility and access problems.

A visibility problem occurs if thread A reads shared data which is later changed by thread B and thread A is unaware of this change.

An access problem can occur if several thread access and change the same shared data at the same time.

Visibility and access problem can lead to

  • Liveness failure: The program does not react anymore due to problems in the concurrent access of data, e.g. deadlocks.
  • Safety failure: The program creates incorrect data.

3. Concurrency in Java

3.1. Processes and Threads

A Java program runs in its own process and by default in one thread. Java supports threads as part of the Java language via the Thread code. The Java application can create new threads via this class.

Java 1.5 also provides improved support for concurrency with the in the java.util.concurrent package.

3.2. Locks and thread synchronization

Java provides locks to protect certain parts of the code to be executed by several threads at the same time. The simplest way of locking a certain method or Java class is to define the method or class with the synchronized keyword.

The synchronized keyword in Java ensures:

  • that only a single thread can execute a block of code at the same time
  • that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock

Synchronization is necessary for mutually exclusive access to blocks of and for reliable communication between threads.

You can use the synchronized keyword for the definition of a method. This would ensure that only one thread can enter this method at the same time. Another threads which is calling this method would wait until the first threads leaves this method.

public synchronized void critial() {
    // some thread critical stuff
    // here
}

You can also use the synchronized keyword to protect blocks of code within a method. This block is guarded by a key, which can be either a string or an object. This key is called the lock.

All code which is protected by the same lock can only be executed by one thread at the same time

For example the following datastructure will ensure that only one thread can access the inner block of the add() and next() methods.

package de.vogella.pagerank.crawler;

import java.util.ArrayList;
import java.util.List;

/**
 * Data structure for a web crawler. Keeps track of the visited sites and keeps
 * a list of sites which needs still to be crawled.
 *
 * @author Lars Vogel
 *
 */
public class CrawledSites {
    private List<String> crawledSites = new ArrayList<String>();
    private List<String> linkedSites = new ArrayList<String>();

    public void add(String site) {
        synchronized (this) {
            if (!crawledSites.contains(site)) {
                linkedSites.add(site);
            }
        }
    }

    /**
     * Get next site to crawl. Can return null (if nothing to crawl)
     */
    public String next() {
        if (linkedSites.size() == 0) {
            return null;
        }
        synchronized (this) {
            // Need to check again if size has changed
            if (linkedSites.size() > 0) {
                String s = linkedSites.get(0);
                linkedSites.remove(0);
                crawledSites.add(s);
                return s;
            }
            return null;
        }
    }

}

3.3. Volatile

If a variable is declared with the volatile keyword then it is guaranteed that any thread that reads the field will see the most recently written value. The volatile keyword will not perform any mutual exclusive lock on the variable.

As of Java 5 write access to a volatile variable will also update non-volatile variables which were modified by the same thread. This can also be used to update values within a reference variable, e.g. for a volatile variable person. In this case you must use a temporary variable person and use the setter to initialize the variable and then assign the temporary variable to the final variable. This will then make the address changes of this variable and the values visible to other threads.

4. The Java memory model

4.1. Overview

The Java memory model describes the communication between the memory of the threads and the main memory of the application.

It defines the rules how changes in the memory done by threads are propagated to other threads.

The Java memory model also defines the situations in which a thread re-fresh its own memory from the main memory.

It also describes which operations are atomic and the ordering of the operations.

4.2. Atomic operation

An atomic operation is an operation which is performed as a single unit of work without the possibility of interference from other operations.

The Java language specification guarantees that reading or writing a variable is an atomic operation(unless the variable is of type long or double ). Operations variables of type long or double are only atomic if they declared with the volatile keyword.

Assume i is defined as int. The i++ (increment) operation it not an atomic operation in Java. This also applies for the other numeric types, e.g. long. etc).

The i++ operation first reads the value which is currently stored in i (atomic operations) and then it adds one to it (atomic operation). But between the read and the write the value of i might have changed.

Since Java 1.5 the java language provides atomic variables, e.g. AtomicInteger or AtomicLong which provide methods like getAndDecrement(), getAndIncrement() and getAndSet() which are atomic.

4.3. Memory updates in synchronized code

The Java memory model guarantees that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock.

5. Immutability and Defensive Copies

5.1. Immutability

The simplest way to avoid problems with concurrency is to share only immutable data between threads. Immutable data is data which cannot changed.

To make a class immutable make

  • all its fields final
  • the class declared as final
  • the this reference is not allowed to escape during construction
  • Any fields which refer to mutable data objects are
  • private
  • have no setter method
  • they are never directly returned of otherwise exposed to a caller
  • if they are changed internally in the class this change is not visible and has no effect outside of the class

An immutable class may have some mutable data which is uses to manages its state but from the outside this class nor any attribute of this class can get changed.

For all mutable fields, e.g. Arrays, that are passed from the outside to the class during the construction phase, the class needs to make a defensive-copy of the elements to make sure that no other object from the outside still can change the data

5.2. Defensive Copies

You must protect your classes from calling code. Assume that calling code will do its best to change your data in a way you didn’t expect it. While this is especially true in case of immutable data it is also true for non-immutable data which you still not expect that this data is changed outside your class.

To protect your class against that you should copy data you receive and only return copies of data to calling code.

The following example creates a copy of a list (ArrayList) and returns only the copy of the list. This way the client of this class cannot remove elements from the list.

package de.vogella.performance.defensivecopy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class MyDataStructure {
    List<String> list = new ArrayList<String>();

    public void add(String s) {
        list.add(s);
    }

    /**
     * Makes a defensive copy of the List and return it
     * This way cannot modify the list itself
     *
     * @return List<String>
     */
    public List<String> getList() {
        return Collections.unmodifiableList(list);
    }
}

6. Threads in Java

The base means for concurrency are is the java.lang.Threads class. A Thread executes an object of type java.lang.Runnable.

Runnable is an interface with defines the run() method. This method is called by the Thread object and contains the work which should be done. Therefore the “Runnable” is the task to perform. The Thread is the worker who is doing this task.

The following demonstrates a task (Runnable) which counts the sum of a given range of numbers. Create a new Java project called de.vogella.concurrency.threads for the example code of this section.

package de.vogella.concurrency.threads;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * <p>
 * MyRunnable is the task which will be performed
 *
 * @author Lars Vogel
 *
 */
public class MyRunnable implements Runnable {
    private final long countUntil;

    MyRunnable(long countUntil) {
        this.countUntil = countUntil;
    }

    @Override
    public void run() {
        long sum = 0;
        for (long i = 1; i < countUntil; i++) {
            sum += i;
        }
        System.out.println(sum);
    }
}

The following example demonstrate the usage of the Thread and the Runnable class.

package de.vogella.concurrency.threads;

import java.util.ArrayList;
import java.util.List;

public class Main {

    public static void main(String[] args) {
        // We will store the threads so that we can check if they are done
        List<Thread> threads = new ArrayList<Thread>();
        // We will create 500 threads
        for (int i = 0; i < 500; i++) {
            Runnable task = new MyRunnable(10000000L + i);
            Thread worker = new Thread(task);
            // We can set the name of the thread
            worker.setName(String.valueOf(i));
            // Start the thread, never call method run() direct
            worker.start();
            // Remember the thread for later usage
            threads.add(worker);
        }
        int running = 0;
        do {
            running = 0;
            for (Thread thread : threads) {
                if (thread.isAlive()) {
                    running++;
                }
            }
            System.out.println("We have " + running + " running threads. ");
        } while (running > 0);

    }
}

Using the Thread class directly has the following disadvantages.

  • Creating a new thread causes some performance overhead.
  • Too many threads can lead to reduced performance, as the CPU needs to switch between these threads.
  • You cannot easily control the number of threads, therefore you may run into out of memory errors due to too many threads.

The java.util.concurrent package offers improved support for concurrency compared to the direct usage of Threads. This package is described in the next section.

7. Threads pools with the Executor Framework

You find this examples in the source section in Java project called de.vogella.concurrency.threadpools.

Thread pools manage a pool of worker threads. The thread pools contains a work queue which holds tasks waiting to get executed.

A thread pool can be described as a collection of Runnable objects.

(work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.

The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads. The ExecutorService adds life cycle methods to the Executor, which allows to shutdown the Executor and to wait for termination.

If you want to use one thread pool with one thread which executes several runnables you can use the Executors.newSingleThreadExecutor() method.

Create again the Runnable.

package de.vogella.concurrency.threadpools;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * <p>
 * MyRunnable is the task which will be performed
 *
 * @author Lars Vogel
 *
 */
public class MyRunnable implements Runnable {
    private final long countUntil;

    MyRunnable(long countUntil) {
        this.countUntil = countUntil;
    }

    @Override
    public void run() {
        long sum = 0;
        for (long i = 1; i < countUntil; i++) {
            sum += i;
        }
        System.out.println(sum);
    }
}

Now you run your runnables with the executor framework.

package de.vogella.concurrency.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    private static final int NTHREDS = 10;

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        for (int i = 0; i < 500; i++) {
            Runnable worker = new MyRunnable(10000000L + i);
            executor.execute(worker);
        }
        // This will make the executor accept no new threads
        // and finish all existing threads in the queue
        executor.shutdown();
        // Wait until all threads are finish
        executor.awaitTermination();
        System.out.println("Finished all threads");
    }
}

In case the threads should return some value (result-bearing threads) then you can use the java.util.concurrent.Callable class.

8. Futures and Callables

8.1. Futures and Callables

The executor framework presented in the last chapter uses Runnable objects. Unfortunately a Runnable cannot return a result to the caller.

In case you expect your threads to return a computed result you can use java.util.concurrent.Callable. The Callable object allows to return values after completion.

The Callable object uses generics to define the type of object which is returned.

If you submit a Callable object to an Executor, the framework returns an object of type java.util.concurrent.Future. Future exposes methods allowing a client to monitor the progress of a task being executed by a different thread. Therefore, a Future object can be used to check the status of a Callable. It can also be used to retrieve the result from the Callable.

On the Executor you can use the method submit to submit a Callable and to get a future. To retrieve the result of the future use the get() method.

package de.vogella.concurrency.callables;

import java.util.concurrent.Callable;

public class MyCallable implements Callable<Long> {
    @Override
    public Long call() throws Exception {
        long sum = 0;
        for (long i = 0; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}
package de.vogella.concurrency.callables;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutures {
    private static final int NTHREDS = 10;

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        List<Future<Long>> list = new ArrayList<Future<Long>>();
        for (int i = 0; i < 20000; i++) {
            Callable<Long> worker = new MyCallable();
            Future<Long> submit = executor.submit(worker);
            list.add(submit);
        }
        long sum = 0;
        System.out.println(list.size());
        // now retrieve the result
        for (Future<Long> future : list) {
            try {
                sum += future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println(sum);
        executor.shutdown();
    }
}

8.2. Drawbacks with Futures and Callables

The Future interface is limited as a model of asynchronously executed tasks. Future allows a client to query a Callable task for its result. It does not provide the option to register a callback method. A callback method would allow you to get a callback once a task is done. In Java 5 you could use ExecutorCompletionService for this purpose but as of Java 8 you can use the CompletableFuture interface which allows to provide a callback interface which is called once a task is completed.

9. CompletableFuture

Asynchronous task handling is important for any application which performs time consuming activities, as IO operations. Two basic approaches to asynchronous task handling are available to a Java application:

  • application logic blocks until a task completes
  • application logic is called once the task completes, this is called a nonblocking approach.

CompletableFuture extends the functionality of the Future interface for asynchronous calls. It also implements the CompletionStage interface. CompletionStage offers methods, that let you attach callbacks that will be executed on completion.

It adds standard techniques for executing application code when a task completes, including various ways to combine tasks. CompletableFuture support both blocking and nonblocking approaches, including regular callbacks.

This callback can be executed in another thread as the thread in which the CompletableFuture is executed.

The following example demonstrates how to create a basic CompletableFuture.

CompletableFuture.supplyAsync(this::doSomething);

CompletableFuture.supplyAsync runs the task asynchronously on the default thread pool of Java. It has the option to supply your custom executor to define the ThreadPool.

package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSimpleSnippet {
    public static void main(String[] args) {
        long started = System.currentTimeMillis();

        // configure CompletableFuture
        CompletableFuture<Integer> futureCount = createCompletableFuture();

            // continue to do other work
            System.out.println("Took " + (started - System.currentTimeMillis()) + " milliseconds" );

            // now its time to get the result
            try {
              int count = futureCount.get();
                System.out.println("CompletableFuture took " + (started - System.currentTimeMillis()) + " milliseconds" );

               System.out.println("Result " + count);
             } catch (InterruptedException | ExecutionException ex) {
                // Exceptions from the future should be handled here
            }
    }

    private static CompletableFuture<Integer> createCompletableFuture() {
        CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        // simulate long running task
                        Thread.sleep(5000);
                    } catch (InterruptedException e) { }
                    return 20;
                });
        return futureCount;
    }

}

The usage of the thenApply method is demonstrated by the following code snippet.

package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCallback {
    public static void main(String[] args) {
        long started = System.currentTimeMillis();

        CompletableFuture<String>  data = createCompletableFuture()
                .thenApply((Integer count) -> {
                    int transformedValue = count * 10;
                    return transformedValue;
                }).thenApply(transformed -> "Finally creates a string: " + transformed);

            try {
                System.out.println(data.get());
            } catch (InterruptedException | ExecutionException e) {

            }
    }

    public static CompletableFuture<Integer> createCompletableFuture() {
        CompletableFuture<Integer>  result = CompletableFuture.supplyAsync(() -> {
            try {
                // simulate long running task
                Thread.sleep(5000);
            } catch (InterruptedException e) { }
            return 20;
        });
        return result;
    }

}

10. Nonblocking algorithms

Java 5.0 provides supports for additional atomic operations. This allows to develop algorithm which are non-blocking algorithm, e.g. which do not require synchronization, but are based on low-level atomic hardware primitives such as compare-and-swap (CAS). A compare-and-swap operation check if the variable has a certain value and if it has this value it will perform this operation.

Non-blocking algorithms are typically faster than blocking algorithms, as the synchronization of threads appears on a much finer level (hardware).

For example this created a non-blocking counter which always increases. This example is contained in the project called de.vogella.concurrency.nonblocking.counter.

package de.vogella.concurrency.nonblocking.counter;

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger value = new AtomicInteger();
    public int getValue(){
        return value.get();
    }
    public int increment(){
        return value.incrementAndGet();
    }

    // Alternative implementation as increment but just make the
    // implementation explicit
    public int incrementLongVersion(){
        int oldValue = value.get();
        while (!value.compareAndSet(oldValue, oldValue+1)){
             oldValue = value.get();
        }
        return oldValue+1;
    }

}

And a test.

package de.vogella.concurrency.nonblocking.counter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
        private static final int NTHREDS = 10;

        public static void main(String[] args) {
            final Counter counter = new Counter();
            List<Future<Integer>> list = new ArrayList<Future<Integer>>();

            ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
            for (int i = 0; i < 500; i++) {
                Callable<Integer> worker = new  Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        int number = counter.increment();
                        System.out.println(number );
                        return number ;
                    }
                };
                Future<Integer> submit= executor.submit(worker);
                list.add(submit);

            }


            // This will make the executor accept no new threads
            // and finish all existing threads in the queue
            executor.shutdown();
            // Wait until all threads are finish
            while (!executor.isTerminated()) {
            }
            Set<Integer> set = new HashSet<Integer>();
            for (Future<Integer> future : list) {
                try {
                    set.add(future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            if (list.size()!=set.size()){
                throw new RuntimeException("Double-entries!!!");
            }

        }


}

The interesting part is how the incrementAndGet() method is implemented. It uses a CAS operation.

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

The JDK itself makes more and more use of non-blocking algorithms to increase performance for every developer. Developing correct non-blocking algorithm is not a trivial task.

For more information on non-blocking algorithm, e.g. examples for a non-blocking Stack and non-block LinkedList, please see http://www.ibm.com/developerworks/java/library/j-jtp04186/index.html

11. Fork-Join in Java 7

Java 7 introduce a new parallel mechanism for compute intensive tasks, the fork-join framework. The fork-join framework allows you to distribute a certain task on several workers and then wait for the result.

For Java 6.0 you can download the package (jsr166y) from the Download site.

For testing create the Java project “de.vogella.performance.forkjoin”. If you are not using Java 7 you also need to jsr166y.jar to the classpath.

Create first a algorithm package and then the following class.

package algorithm;

import java.util.Random;

/**
 *
 * This class defines a long list of integers which defines the problem we will
 * later try to solve
 *
 */
public class Problem {
    private final int[] list = new int[2000000];

    public Problem() {
        Random generator = new Random(19580427);
        for (int i = 0; i < list.length; i++) {
            list[i] = generator.nextInt(500000);
        }
    }

    public int[] getList() {
        return list;
    }

}

Define now the Solver class as shown in the following example coding.

The API defines other top classes, e.g. RecursiveAction, AsyncAction. Check the Javadoc for details.
package algorithm;

import java.util.Arrays;

import jsr166y.forkjoin.RecursiveAction;

public class Solver extends RecursiveAction {
    private int[] list;
    public long result;

    public Solver(int[] array) {
        this.list = array;
    }

    @Override
    protected void compute() {
        if (list.length == 1) {
            result = list[0];
        } else {
            int midpoint = list.length / 2;
            int[] l1 = Arrays.copyOfRange(list, 0, midpoint);
            int[] l2 = Arrays.copyOfRange(list, midpoint, list.length);
            Solver s1 = new Solver(l1);
            Solver s2 = new Solver(l2);
            forkJoin(s1, s2);
            result = s1.result + s2.result;
        }
    }
}

Now define a small test class for testing it efficiency.

package testing;

import jsr166y.forkjoin.ForkJoinExecutor;
import jsr166y.forkjoin.ForkJoinPool;
import algorithm.Problem;
import algorithm.Solver;

public class Test {

    public static void main(String[] args) {
        Problem test = new Problem();
        // check the number of available processors
        int nThreads = Runtime.getRuntime().availableProcessors();
        System.out.println(nThreads);
        Solver mfj = new Solver(test.getList());
        ForkJoinExecutor pool = new ForkJoinPool(nThreads);
        pool.invoke(mfj);
        long result = mfj.getResult();
        System.out.println("Done. Result: " + result);
        long sum = 0;
        // check if the result was ok
        for (int i = 0; i < test.getList().length; i++) {
            sum += test.getList()[i];
        }
        System.out.println("Done. Result: " + sum);
    }
}

12. Deadlock

A concurrent application has the risk of a deadlock. A set of processes are deadlocked if all processes are waiting for an event which another process in the same set has to cause.

For example if thread A waits for a lock on object Z which thread B holds and thread B wait for a look on object Y which is hold be process A then these two processes are locked and cannot continue in their processing.

This can be compared to a traffic jam, where cars(threads) require the access to a certain street(resource), which is currently blocked by another car(lock).

Deadlock

Copyright © 2012-2017 vogella GmbH. Free use of the software examples is granted under the terms of the EPL License. This tutorial is published under the Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Germany license.

See Licence.

from:http://www.vogella.com/tutorials/JavaConcurrency/article.html

Pinterest谈实战经验:如何在两年内实现零到数百亿的月访问

Pinterest一直保持着指数增长,每一个半月都会翻一翻。在两年内,他们实现了从0到数百亿的月PV;从开始的两个创始人加一个工程师增长到现在超过40个工程师,从一个小型的MySQL服务器增长到180个Web Enigne、240个API Enigne、88个MySQL DB(cc2.8xlarge,每个DB都会配置一个从属节点)、110个Redis Instance以及200个Mmecache Instance。

在一个名为 《Scaling Pinterest》 的主题演讲上,Pinterest的Yashwanth NelapatiMarty Weiner为我们讲述了这个戏剧性的过程。当然扩展到当下规模,Pinterest在众多选择中不可避免的走了许多的弯路,而Todd Hoff认为其中最宝贵的经验该归结于以下两点:

  1. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  2. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障,这导致他们对工具的选择有着特殊的偏好:成熟、简单、优秀、知名、被更多的用户喜爱、更好的支持、稳定且杰出的表现、通常情况下无故障以及免费。使用这些标准,他们选择了MySQL、Solr、Memcache、Redis、Cassandra,同时还抛弃了MongoDB。

同样这两个点是有关联的,符合第二个原则的工具就可以通过投入更多的主机进行扩展。即使负载的增加,项目也不会出现很多故障。即使真的出现难以解决的问题,至少有一个社区去寻找问题解决的方案。一旦你选择过于复杂和挑剔的工具,在扩展的道路上将充满荆棘。

需要注意的是所有他们选择的工具都依靠增加分片来进行扩展,而非通过集群。讲话中还阐述了为什么分片优于集群以及如何进行分片,这些想法可能是之前你闻所未闻的。

下面就看一下Pinterest扩展的阶段性时间轴:

项目背景

  • Pins是由其它零零碎碎信息集合成的图片,显示了对客户重要的信息,并且链接到它所在的位置。
  • Pinterest是一个社交网络,你可以follow(关注)其他人以及board。
  • 数据库:Pinterest的用户拥有board,而每个board都包含pin;follow及repin人际关系、验证信息。

1. 2010年3月发布——寻找真我的时代

在那时候,你甚至不知道需要建立一个什么样的产品。你有想法,所以你快速的迭代以及演变。而最终你将得到一些很小的MySQL查询,而这些查询在现实生活中你从未进行过。

Pinterest初期阶段的一些数字:

  • 2个创始人
  • 1个工程师
  • Rackspace
  • 1个小的网络引擎
  • 1个小的MySQL数据库
  • 2011年11月

仍然是小规模,产品通过用户反馈进行演变后的数字是:

  • Amazon EC2 + S3 + CloudFront
  • 1 NGinX, 4 Web Engines (用于冗余,不全是负载)
  • 1 MySQL DB + 1 Read Slave (用于主节点故障情况)
  • 1 Task Queue + 2 Task Processors
  • 1 MongoDB (用于计数)
  • 2 Engineers

2. 贯穿2011年——实验的时代

迈上疯狂增长的脚步,基本上每1个半月翻一翻。

  • 当你增长的如此之快,每一天每一星期你可能都需要打破或者抛弃一些东西。
  • 在这个时候,他们阅读大量的论文,这些论文都阐述着只需要添加一台主机问题就会得以解决。他们着手添加许多技术,随后又不得不放弃。
  • 于是出现了一些很奇怪的结果
  • Amazon EC2 + S3 + CloudFront
  • 2NGinX, 16 Web Engines + 2 API Engines
  • 5 Functionally Sharged MySQL DB + 9 read slaves
  • 4 Cassandra Nodes
  • 15 Membase Nodes (3 separate clusters)
  • 8 Memcache Nodes
  • 10 Redis Nodes
  • 3 Task Routers + 4 Task Processors
  • 4 Elastic Search Nodes
  • 3 Mongo Clusters
  • 3个工程师
  • 5个主数据库技术,只为了独立其中的数据。
  • 增长太快以至于MySQL疲于奔命,所有其它的技术也达到了极限。
  • 当你把事物用至极限时,这些技术都会以各自不同的方式出错。
  • 开始抛弃一些技术,并且自我反省究竟需要些什么,基本上重做了所有的架构。

3. 2012年2月——成熟的时代

  • 在重做了所有的架构后,系统呈现了如下状态
  • Amazon EC2 + S3 + Akamai, ELB
  • 90 Web Engines + 50 API Engines
  • 66 MySQL DBs (m1.xlarge) +,每个数据库都配备了从属节点
  • 59 Redis Instances
  • 51 Memcache Instances
  • 1 Redis Task Manager + 25 Task Processors
  • Sharded Solr
  • 6个工程师
  • 现在采用的技术是被分片的MySQL、Redis、Memcache和Solr,有点在于这些技术都很简单很成熟。
  • 网络传输增长仍然保持着以往的速度,而iPhone传输开始走高。

4. 2012年10月12日 —— 收获的季节

大约是1月份的4倍

  • 现在的数据是:
  • Amazon EC2 + S3 + Edge Cast,Akamai, Level 3
  • 180 Web Engines + 240 API Engines
  • 88 MySQL DBs (cc2.8xlarge) ,同样每个数据库都有一个从属节点
  • 110 Redis Instances
  • 200 Memcache Instances
  • 4 Redis Task Manager + 80 Task Processors
  • Sharded Solr
  • 40个工程师(仍在增长)
  • 需要注意的是,如今的架构已趋近完美,应对增长只需要投入更多的主机。
  • 当下已开始转移至SSD

下面一览该演讲中的干货,决策的制定:

为什么会选择EC2和S3

  1. 相当好的可靠性,即使数据中心发生故障。多租户会增加风险,但是也不是太坏。
  2. 良好的报告和支持。它们(EC2和S3)有着良好的架构,并且知道问题所在。
  3. 完善的周边设施,特别是在你需要快速增长时。你可以从APP Engine处获得maged cache、负载均衡、MapReduce、数据库管理以及其它你不想自己动手编写的组件,这可以加速你应用程序的部署,而在你工程师空闲时,你可以着手编写你需要的一切。
  4. 新的实例可以在几秒内就绪,这就是云的力量;特别是在只有两个工程师的初期,不需要去担心容量规划,更不需要花两个星期去建立自己的Memcache,你可以在数分钟内添加10个Memcached。
  5. 缺点:有限的选择。直到最近,才可以选择使用SSD,同时无法获得太大的内存配置。
  6. 优点:你不需要给大量的主机进行不同的配置。

为什么会选择MySQL

  1. 非常成熟。
  2. 非常稳定。不会宕机,并且不会丢失数据。
  3. 在招聘上具有优势,市场上有大把的人才。
  4. 在请求呈直线上升时,仍能将相应时间控制在一定的范围内,有些数据库技术在面对请求的飙升时表现并不是很好。
  5. 非常好的周边软件支持——XtraBackup、Innotop、Maatkit。
  6. 可以从类似Percona这样的公司得到优秀的技术支持。
  7. 开源(免费)——这一点非常重要,特别是在资金缺乏的初期

为什么使用Memcache

  • 非常成熟。
  • 非常简单。可以当成是一个socket哈希表
  • 杰出稳定的表现
  • 知名并为大量用户喜爱
  • 永不崩溃
  • 开源

为什么选择Redis

  • 虽然还不够成熟,但是非常简单及优秀
  • 提供了大量的数据结构类型
  • 提供多种的选择进行持久化和备份:你可以备份而非持久化,选择备份的话你还可以选择多久备份一次;同样你还可以选择使用什么方式进行持久化,比如MySQL等。
  • Home feed被储存在Redis上,每3个小时保存一次;然而并不是3个小时持久化一次,只是简单的每3个小时备份一次。
  • 如果你存储数据的主机发生故障,丢失的也只是备份周期内的数据。虽然不是完全可靠,但是非常简单。避免了复杂的持久化及复制,这样的架构简单且便宜。
  • 知名并为大量用户喜爱
  • 稳定且杰出的表现
  • 很少出故障。有一些专有的故障模型,你需要学会解决。这也是成熟的优势,只需要学习就可以解决。
  • 开源

Solr

  1. 只需要几分钟的安装时间,就可以投入使用
  2. 不能扩展到多于一台的机器上(最新版本并非如此)
  3. 尝试弹性搜索,但是以Pinterest的规模来说,可能会因为零碎文件和查询太多而产生问题。
  4. 选择使用Websolr,但是Pinterest拥有搜索团队,将来可能会开发自己的版本。

集群vs.分片

  • 在迅速扩展的过程中,Pinterest认识到每次负载的增加,都需要均匀的传播他们的数据。
  • 针对问题先确定解决方案的范围,他们选择的范围是集群和分片之间的一系列解决方案。

集群——所有的操作都是通过自动化

  • 比如:Cassandra、MemBase、HBase
  • 结论:没有安全感,将来可能会比较成熟,但是当下这个解决方案中还存在太多的复杂性和故障点。
  • 特性:
  • 数据自动分布
  • 节点间转移数据
  • 需要平衡分配
  • 节点间的相互通信,需要做很多措施用于防止干扰、无效传递及协商。
  • 优点:
  • 自动扩展你的数据存储,最起码论文中是这么说的。
  • 便于安装
  • 数据上的空间分布及机房共置。你可以在不同区域建立数据中心,数据库会帮你打理好一切。
  • 高有效性
  • 负载平衡
  • 不存在单点故障
  • 缺点:
  • 仍然不成熟。
  • 本质上说还很复杂。一大堆的节点必须对称协议,这一点非常难以解决。
  • 缺少社区支持。社区的讨论因为产品方向的不同而不能统一,而在每个正营中也缺乏强有力的支持。
  • 缺乏领域内资深工程师,可能大多数的工程师都还未使用过Cassandra。
  • 困难、没有安全感的机制更新。这可能是因为这些技术都使用API并且只在自己的领域内通行,这导致了复杂的升级路径。
  • 集群管理算法本身就用于处理SPOF(单点故障),如果存在漏洞的话可能就会影响到每个节点。
  • 集群管理器代码非常复杂,并且需要在所有节点上重复,这就可能存在以下的故障模式:
  • 数据平衡失控。当给集群中添加新的主机时,可能因为数据的拷贝而导致集群性能下降。那么你该做什么?这里不存在去发现问题所在的工具。没有社区可以用来求助,同样你也被困住了,这也是Pinterest回到MySQL的原因。
  • 跨节点的数据损坏。如果这里存在一个漏洞,这个漏洞可能会影响节点间的日志系统和压缩等其它组件?你的读延时增加,所有的数据都会陷入麻烦以及丢失。
  • 错误负载平衡很难被修复,这个现象十分普遍。如果你有10个节点,并且你注意到所有的负载都被堆积到一个节点上。虽然可以手动处理,但是之后系统还会将负载都加之一个节点之上。
  • 数据所有权问题,主次节点转换时的数据丢失。集群方案是非常智能的,它们会在特定的情况下完成节点权利的转换,而主次节点切换的过程中可能会导致数据的部分丢失,而丢失部分数据可能比丢失全部还糟糕,因为你不可能知道你究竟丢失了哪一部分。

分片——所有事情都是手动的

  • 结论:它是获胜者。Todd Hoff还认为他们的分片架构可能与Flickr架构类似。
  • 特性:
  • 分片可以让你摆脱集群方案中所有不想要的特性。
  • 数据需要手动的分配。
  • 数据不会移动。Pinterest永远都不会在节点间移动,尽管有些人这么做,这让他们在一定范围内站的更高。
  • 通过分割数据的方式分配负载。
  • 节点并没有互相通信,使用一些主节点控制程序的运行。
  • 优点:
  • 可以分割你的数据库以提高性能。
  • 空间分布及放置数据
  • 高有效性
  • 负载平衡
  • 放置数据的算法非常简单。主要原因是,用于处理单点故障的代码只有区区的半页,而不是一个复杂的集群管理器。并且经过短暂的测试就知道它是否能够正常工作。
  • ID生成非常简单
  • 缺点:
  • 不可以执行大多数的join。
  • 失去所有事务的能力。在一个数据库上的插入可能会成功,而在另一个上会失败。
  • 许多约束必须放到应用程序层。
  • 模式的转变需要从长计议。
  • 报告需要在所有分片上执行查询,然后需要手动的进行聚合。
  • Join在应用程序层执行。
  • 应用程序必须容忍以上所有问题。

什么时候进行分片

  1. 如果你的项目拥有PB级的数据,那么你需要立刻对其进行分片。
  2. Pin表格拥有百万行索引,索引大小已经溢出内存并被存入了磁盘。
  3. Pinterest使用了最大的表格,并将它们(这些索引)放入自己的数据库。
  4. 然后果断的超过了单数据库容量。
  5. 接着Pinterest必须进行分片。

分片的过渡

  • 过渡从一个特性的冻结开始。
  • 确认分片该达到什么样的效果——希望尽少的执行查询以及最少数量的数据库去呈现一个页面。
  • 剔除所有的MySQL join,将要做join的表格加载到一个单独的分片去做查询。
  • 添加大量的缓存,基本上每个查询都需要被缓存。
  • 这个步骤看起来像:
  • 1 DB + Foreign Keys + Joins
  • 1 DB + Denormalized + Cache
  • 1 DB + Read Slaves + Cache
  • Several functionally sharded DBs+Read Slaves+Cache
  • ID sharded DBs + Backup slaves + cache
  • 早期的只读从属节点一直都存在问题,因为存在slave lag。读任务分配给了从属节点,然而主节点并没有做任何的备份记录,这样就像一条记录丢失。之后Pinterest使用缓存解决了这个问题。
  • Pinterest拥有后台脚本,数据库使用它来做备份。检查完整性约束、引用。
  • 用户表并不进行分片。Pinterest只是使用了一个大型的数据库,并在电子邮件和用户名上做了相关的一致性约束。如果插入重复用户,会返回失败。然后他们对分片的数据库做大量的写操作。

如何进行分片

  • 可以参考Cassandra的ring模型、Membase以及Twitter的Gizzard。
  • 坚信:节点间数据传输的越少,你的架构越稳定。
  • Cassandra存在数据平衡和所有权问题,因为节点们不知道哪个节点保存了另一部分数据。Pinterest认为应用程序需要决定数据该分配到哪个节点,那么将永远不会存在问题。
  • 预计5年内的增长,并且对其进行预分片思考。
  • 初期可以建立一些虚拟分片。8个物理服务器,每个512DB。所有的数据库都装满表格。
  • 为了高有效性,他们一直都运行着多主节点冗余模式。每个主节点都会分配给一个不同的可用性区域。在故障时,该主节点上的任务会分配给其它的主节点,并且重新部署一个主节点用以代替。
  • 当数据库上的负载加重时:
  • 先着眼节点的任务交付速度,可以清楚是否有问题发生,比如:新特性,缓存等带来的问题。
  • 如果属于单纯的负载增加,Pinterest会分割数据库,并告诉应用程序该在何处寻找新的节点。
  • 在分割数据库之前,Pinterest会给这些主节点加入一些从属节点。然后置换应用程序代码以匹配新的数据库,在过渡的几分钟之内,数据会同时写入到新旧节点,过渡结束后将切断节点之间的通道。

ID结构

  • 一共64位
  • 分片ID:16位
  • Type:10位—— Board、User或者其它对象类型
  • 本地ID——余下的位数用于表中ID,使用MySQL自动递增。
  • Twitter使用一个映射表来为物理主机映射ID,这将需要备份;鉴于Pinterest使用AWS和MySQL查询,这个过程大约需要3毫秒。Pinterest并没有让这个额外的中间层参与工作,而是将位置信息构建在ID里。
  • 用户被随机分配在分片中间。
  • 每个用户的所有数据(pin、board等)都存放在同一个分片中,这将带来巨大的好处,避免了跨分片的查询可以显著的增加查询速度。
  • 每个board都与用户并列,这样board可以通过一个数据库处理。
  • 分片ID足够65536个分片使用,但是开始Pinterest只使用了4096个,这允许他们轻易的进行横向扩展。一旦用户数据库被填满,他们只需要增加额外的分片,然后让新用户写入新的分片就可以了。

查找

  • 如果存在50个查找,举个例子,他们将ID分割且并行的运行查询,那么延时将达到最高。
  • 每个应用程序都有一个配置文件,它将给物理主机映射一个分片范围。
  • “sharddb001a”: : (1, 512)
  • “sharddb001b”: : (513, 1024)——主要备份主节点
  • 如果你想查找一个ID坐落在sharddb003a上的用户:
  • 将ID进行分解
  • 在分片映射中执行查找
  • 连接分片,在数据库中搜寻类型。并使用本地ID去寻找这个用户,然后返回序列化数据。

对象和映射

  • 所有数据都是对象(pin、board、user、comment)或者映射(用户由baord,pin有like)。
  • 针对对象,每个本地ID都映射成MySQL Blob。开始时Blob使用的是JSON格式,之后会给转换成序列化的Thrift。
  • 对于映射来说,这里有一个映射表。你可以为用户读取board,ID包含了是时间戳,这样就可以体现事件的顺序。
  • 同样还存在反向映射,多表对多表,用于查询有哪些用户喜欢某个pin这样的操作。
  • 模式的命名方案是:noun_verb_noun: user_likes_pins, pins_like_user。
  • 只能使用主键或者是索引查找(没有join)。
  • 数据不会向集群中那样跨数据的移动,举个例子:如果某个用户坐落在20分片上,所有他数据都会并列存储,永远不会移动。64位ID包含了分片ID,所以它不可能被移动。你可以移动物理数据到另一个数据库,但是它仍然与相同分片关联。
  • 所有的表都存放在分片上,没有特殊的分片,当然用于检测用户名冲突的巨型表除外。
  • 不需要改变模式,一个新的索引需要一个新的表。
  • 因为键对应的值是blob,所以你不需要破坏模式就可以添加字段。因为blob有不同的版本,所以应用程序将检测它的版本号并且将新记录转换成相应的格式,然后写入。所有的数据不需要立刻的做格式改变,可以在读的时候进行更新。
  • 巨大的胜利,因为改变表格需要在上面加几个小时甚至是几天的锁。如果你需要一个新的索引,你只需要建立一张新的表格,并填入内容;在不需要的时候,丢弃就好。

呈现一个用户文件界面

  1. 从URL中取得用户名,然后到单独的巨型数据库中查询用户的ID。
  2. 获取用户ID,并进行拆分
  3. 选择分片,并进入
  4. SELECT body from users WHERE id = <local_user_id>
  5. SELECT board_id FROM user_has_boards WHERE user_id=<user_id>
  6. SELECT body FROM boards WHERE id IN (<boards_ids>)
  7. SELECT pin_id FROM board_has_pins WHERE board_id=<board_id>
  8. SELECT body FROM pins WHERE id IN (pin_ids)
  9. 所有调用都在缓存中进行(Memcache或者Redis),所以在实践中并没有太多连接数据库的后端操作。

脚本相关

  1. 当你过渡到一个分片架构,你拥有两个不同的基础设施——没有进行分片的旧系统和进行分片的新系统。脚本成为了新旧系统之间数据传输的桥梁。
  2. 移动5亿的pin、16亿的follower行等。
  3. 不要轻视项目中的这一部分,Pinterest原认为只需要2个月就可以完成数据的安置,然而他们足足花了4至5个月时间,别忘了期间他们还冻结了一项特性。
  4. 应用程序必须同时对两个系统插入数据。
  5. 一旦确认所有的数据都在新系统中就位,就可以适当的增加负载来测试新后端。
  6. 建立一个脚本农场,雇佣更多的工程师去加速任务的完成。让他们做这些表格的转移工作。
  7. 设计一个Pyres副本,一个到GitHub Resque队列的Python的接口,这个队列建立在Redis之上。支持优先级和重试,使用Pyres取代Celery和RabbitMQ更是让他们受益良多。
  8. 处理中会产生大量的错误,用户可能会发现类似丢失board的错误;必须重复的运行任务,以保证在数据的处理过程中不会出现暂时性的错误。

开发相关

  • 开始尝试只给开发者开放系统的一部分——他们每个人都拥有自己的MySQL服务器等,但是事情改变的太快,以至于这个模式根本无法实行。
  • 转变成Facebook模式,每个人都可以访问所有东西,所以不得不非常小心。

未来的方向

  • 基于服务的架构
  • 当他们发现大量的数据库负载,他们开始布置大量的应用程序服务器和一些其它的服务器,所有这些服务器都连接至MySQL和Memcache。这意味着在Memcache上将存在3万的连接,这些连接将占用几个G的内存,同时还会产生大量的Memcache守护进程。
  • 为了解决这个问题,将这些工作转移到了一个服务架构。比如:使用一个follower服务,这个服务将专注处理follower查询。这将接下30台左右的主机去连接数据库和缓存,从而减少了连接的数量。
  • 对功能进行隔离,各司其职。让一个服务的开发者不能访问其它的服务,从而杜绝安全隐患。

学到的知识

  1. 为了应对未来的问题,让其保持简单。
  2. 让其变的有趣。只要应用程序还在使用,就会有很多的工程师加入,过于复杂的系统将会让工作失去乐趣。让架构保持简单就是大的胜利,新的工程师从入职的第一周起就可以对项目有所贡献。
  3. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障。
  4. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  5. 集群管理算法本身就用于处理SPOF,如果存在漏洞的话可能就会影响到每个节点。
  6. 为了快速的增长,你需要为每次负载增加的数据进行均匀分配。
  7. 在节点间传输的数据越少,你的架构越稳定。这也是他们弃集群而选择分片的原因。
  8. 一个面向服务的架构规则。拆分功能,可以帮助减少连接、组织团队、组织支持以及提升安全性。
  9. 搞明白自己究竟需要什么。为了匹配愿景,不要怕丢弃某些技术,甚至是整个系统的重构。
  10. 不要害怕丢失一点数据。将用户数据放入内存,定期的进行持久化。失去的只是几个小时的数据,但是换来的却是更简单、更强健的系统!

原文链接: Scaling Pinterest – From 0 To 10s Of Billions Of Page Views A Month In Two Years (编译/仲浩 审校/王旭东)

from:http://www.csdn.net/article/2013-04-16/2814902-how-pinterest-scaling-0-to-billions-pv

消息队列设计精要

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。

本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。

本文首先会阐述什么时候你需要一个消息队列,然后以Push模型为主,从零开始分析设计一个消息队列时需要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。

也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题,如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。

何时需要消息队列

当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。

比如在美团旅游,我们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候,如果不使用消息系统,势必要调用我们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,作为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。

而我们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来说,这也不是我们的职责所在。说白了,如果他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来说太过于“重量级”了,只需要发布一个产品ID变更的通知,由下游系统来处理,可能更为合理。

再举一个例子,对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。

业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。

以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。

然而,这个过程中存在很多可能的意外:

  1. A扣钱成功,调用B加钱接口失败。
  2. A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
  3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。
  2. 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
  3. 回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
  4. 整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
  5. 具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
  6. broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
  7. 我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。

像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。

这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。

对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。

支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。

当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。

如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

如何设计一个消息队列

综述

我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。

 

基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。

我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
  3. 掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。

利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。

之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。

为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。

在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。

下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。

实现队列基本功能

RPC通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定义的框架也好。因为消息队列的RPC,和普通的RPC没有本质区别。当然了,自主利用Memchached或者Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了类似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,都可以使用现成的RPC框架。

简单来讲,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你可能会问,如果producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到所有consumer的存在。其次,producer尽量选择就近的机房就好了。

高可用

其实所有的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。

那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心,放心大胆的交给DBA们吧!

对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步,关于这块HA的细节,可以参考下篇pull模型消息系统设计。

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。

只是这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。

持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。

但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。

市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。

存储子系统的选择

我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。

但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,可以参考下篇的存储子系统设计。

分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

消费关系解析

现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。

市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。

消息需要通知到多个业务集群,而一个业务集群内有很多台机器,只要一台机器消费这个消息就可以了。

当然这不是绝对的,很多时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种情况太过于复杂,一般不列入考虑范围。所以,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。

至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:

  1. 发送关系的维护。
  2. 发送关系变更时的通知。

队列高级特性设计

上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。

可靠投递(最终一致性)

这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。

方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。

具体来说:

  1. producer往broker发送消息之前,需要做一次落地。
  2. 请求到server后,server确保数据落地后再告诉客户端发送成功。
  3. 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。

重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。

Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。

最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。

消费确认

当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不一定。或许因为消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。

把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。

对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制,

消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。

但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。

重复消息和顺序消息

上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:

  1. 允许消息丢失。
  2. 从发送方到服务方到接受者都是单点单线程。

所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。

一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。

谈到重复消息,主要是两个话题:

  1. 如何鉴别消息重复,并幂等的处理重复消息。
  2. 一个消息队列如何尽量减少重复消息的投递。

先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就

能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

  1. 版本号。
  2. 状态机。

版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。

但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。

每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。

如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。

参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。

如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储

就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。

就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。

举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。

那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。

此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

中间件对于重复消息的处理

回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。

我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。

减少重复消息的关键步骤:

  1. broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
  2. 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。

解决方案从大方向上有两种:

  1. 两阶段提交,分布式事务。
  2. 本地事务,本地落地,补偿发送。

分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。

并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。

那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。

然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。

这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。

而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。

本地事务存在两个最大的使用障碍:

  1. 配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
  2. 对于消息延迟高敏感的业务不适用。

话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;同步是需要当时关心

的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。

回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。

对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。整个过程可以参考下面的代码:

客户端同步服务端异步。

 

Future<Result> future = request(server);//server立刻返回future

synchronized(future){

while(!future.isDone()){

 future.wait();//server处理结束后会notify这个future,并修改isdone标志

}

}

return future.get();

 

客户端同步服务端同步。

Result result = request(server);

客户端异步服务端同步(这里用线程池的方式)。

Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
    result = request(server);
}})
return future;

客户端异步服务端异步。

Future<Result> future = request(server);//server立刻返回future


return future

上面说了这么多,其实是想让大家脱离两个误区:

  1. RPC只有客户端能做异步,服务端不能。
  2. 异步只能通过线程池。

那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并

(当然接口可以设计成batch的,但可能batch发过来的仍然数量较少)。而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。

来看第二个误区,返回future的方式不一定只有线程池。换句话说,可以在线程池里面进行同步操作,也可以进行异步操作,也可以不使用线程池使用异步操作(NIO、事件)。

回到消息队列的议题上,我们当然不希望消息的发送阻塞主流程(前面提到了,server端如果使用异步模型,则可能因消息合并带来一定程度上的消息延迟),所以可以先使用线程池提交一个发送请求,主流程继续往下走。

但是线程池中的请求关心结果吗?Of course,必须等待服务端消息成功落地,才算是消息发送成功。所以这里的模型,准确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务需要等待server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继续进行。

总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。

批量

谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:

  1. 攒够了一定数量。
  2. 到达了一定时间。
  3. 队列里有新的数据到来。

对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。

在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
},500,500,TimeUnits.MILLS);
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
   }
});
public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}
private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}

相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。

具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。

最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:

  1. 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
  2. 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

push还是pull

上文提到的消息队列,大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。

我们简要分析下push和pull模型各自存在的利弊。

慢消费

慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。

反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。

但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,

可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?

当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。

即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。

在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~

顺序消息

如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。

反观pull模式,如果想做到全局顺序消息,就相对容易很多:

  1. producer对应partition,并且单线程。
  2. consumer对应partition,消费确认(或批量确认),继续消费即可。

所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。如果你不想看到通篇乱套的日志~~

Anyway,需要顺序消息的场景还是比较有限的而且成本太高,请慎重考虑。

总结

本文从为何使用消息队列开始讲起,然后主要介绍了如何从零开始设计一个消息队列,包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型做了简要分析,最后从批量和异步角度,分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题,如存储系统的设计、流控和错峰的设计、公平调度等。希望通过这些,让大家对消息队列有个提纲挈领的整体认识,并给自主开发消息队列提供思路。另外,本文主要是源自自己在开发消息队列中的思考和读源码时的体会,比较不”官方”,也难免会存在一些漏洞,欢迎大家多多交流。

后续我们还会推出消息队列设计高级篇,内容会涵盖以下方面:

  • pull模型消息系统设计理念
  • 存储子系统设计
  • 流量控制
  • 公平调度

敬请期待哦~

作者简介

王烨,现在是美团旅游后台研发组的程序猿,之前曾经在百度、去哪和优酷工作过,专注Java后台开发。对于网络编程和并发编程具有浓厚的兴趣,曾经做过一些基础组件,也翻过一些源码,属于比较典型的宅男技术控。期待能够与更多知己,在coding的路上并肩前行~

from:https://zhuanlan.zhihu.com/p/21649950

 

分布式服务化系统一致性的“最佳实干”

1 背景

一致性是一个抽象的、具有多重含义的计算机术语,在不同应用场景下,有不同的定义和含义。在传统的IT时代,一致性通常指强一致性,强一致性通常体现在你中有我、我中有你、浑然一体;而在互联网时代,一致性的含义远远超出了它原有的含义,在我们讨论互联网时代的一致性之前,我们先了解一下互联网时代的特点,互联网时代信息量巨大、需要计算能力巨大,不但对用户响应速度要求快,而且吞吐量指标也要向外扩展(既:水平伸缩),于是单节点的服务器无法满足需求,服务节点开始池化,想想那个经典的故事,一只筷子一折就断,一把筷子怎么都折不断,可见人多力量大的思想是多么的重要,但是人多也不一定能解决所有事情,还得进行有序、合理的分配任务,进行有效的管理,于是互联网时代谈论最多的话题就是拆分,拆分一般分为“水平拆分”和“垂直拆分”(大家不要对应到数据库或者缓存拆分,这里主要表达一种逻辑)。这里,“水平拆分”指的是同一个功能由于单机节点无法满足性能需求,需要扩展成为多节点,多个节点具有一致的功能,组成一个服务池,一个节点服务一部分的请求量,团结起来共同处理大规模高并发的请求量。“垂直拆分”指的是按照功能拆分,秉着“专业的人干专业的事儿”的原则,把一个复杂的功能拆分到多个单一的简单的元功能,不同的元功能组合在一起,和未拆分前完成的功能是一致的,由于每个元功能职责单一、功能简单,让维护和变更都变得更简单、安全,更易于产品版本的迭代,在这样的一个互联网的时代和环境,一致性指分布式服务化系统之间的弱一致性,包括应用系统一致性和数据一致性。

无论是水平拆分还是垂直拆分,都解决了特定场景下的特定问题,凡事有好的一面,都会有坏的一面,拆分后的系统或者服务化的系统最大的问题就是一致性问题,这么多个具有元功能的模块,或者同一个功能池中的多个节点之间,如何保证他们的信息是一致的、工作步伐是一致的、状态是一致的、互相协调有序的工作呢?

本文根据作者在互联网企业的实际项目经验,对服务化系统中最难解决的一致性问题进行研究和探讨,试图从实践经验中找到规律,抽象出模式,分享给大家,希望对大家的项目实施有所帮助,在对实践的总结中也会对相关的一致性术语做最朴实的解释,希望能帮助大家彻底理解一致性的本质,并能将其应用到实践,解决读者现实中遇到的服务化系统的一致性问题,本文使用理论与实践相结合的方法,突出在实践中解决问题的模式,因此叫做《分布式服务化系统一致性的“最佳实干”》。

2 问题

本节列举不一致会导致的种种问题,这也包括一例生活中的问题。

案例1:买房

假如你想要享受生活的随意,只想买个两居,不想让房贷有太大压力,而你媳妇却想要买个三居,还得带花园的,那么你们就不一致了,不一致导致生活不愉快、不协调,严重情况下还会吵架,可见生活中的不一致问题影响很大。

案例2:转账

转账是经典的不一致案例,设想一下银行为你处理一笔转账,扣减你账户上的余额,然后增加别人账户的余额;如果扣减你的账户余额成功,增加别人账户余额失败,那么你就会损失这笔资金。反过来,如果扣减你的账户余额失败,增加别人账户余额成功,那么银行就会损失这笔资金,银行需要赔付。对于资金处理系统来说,上面任何一种场景都是不允许发生的,一旦发生就会有资金损失,后果是不堪设想的,严重情况会让一个公司瞬间倒闭,可参考案例

案例3:下订单和扣库存

电商系统中也有一个经典的案例,下订单和扣库存如何保持一致,如果先下订单,扣库存失败,那么将会导致超卖;如果下订单没有成功,扣库存成功,那么会导致少卖。两种情况都会导致运营成本的增加,严重情况下需要赔付。

案例4:同步超时

服务化的系统间调用常常因为网络问题导致系统间调用超时,即使是网络很好的机房,在亿次流量的基数下,同步调用超时也是家常便饭。系统A同步调用系统B超时,系统A可以明确得到超时反馈,但是无法确定系统B是否已经完成了预定的功能或者没有完成预定的功能。于是,系统A就迷茫了,不知道应该继续做什么,如何反馈给使用方。(曾经的一个B2B产品的客户要求接口超时重新通知他们,这个在技术上是难以实现的,因为服务器本身可能并不知道自己超时,可能会继续正常的返回数据,只是客户端并没有接受到结果罢了,因此这不是一个合理的解决方案)。

案例5:异步回调超时

此案例和上一个同步超时案例类似,不过这个场景使用了异步回调,系统A同步调用系统B发起指令,系统B采用受理模式,受理后则返回受理成功,然后系统B异步通知系统A。在这个过程中,如果系统A由于某种原因迟迟没有收到回调结果,那么两个系统间的状态就不一致,互相认知不同会导致系统间发生错误,严重情况下会影响核心事务,甚至会导致资金损失。

案例6:掉单

分布式系统中,两个系统协作处理一个流程,分别为对方的上下游,如果一个系统中存在一个请求,通常指订单,另外一个系统不存在,则导致掉单,掉单的后果很严重,有时候也会导致资金损失。

案例7:系统间状态不一致

这个案例与上面掉单案例类似,不同的是两个系统间都存在请求,但是请求的状态不一致。

案例8:缓存和数据库不一致

交易相关系统基本离不开关系型数据库,依赖关系型数据库提供的ACID特性(后面介绍),但是在大规模高并发的互联网系统里,一些特殊的场景对读的性能要求极高,服务于交易的数据库难以抗住大规模的读流量,通常需要在数据库前垫缓存,那么缓存和数据库之间的数据如何保持一致性?是要保持强一致呢还是弱一致性呢?

案例9:本地缓存节点间不一致

一个服务池上的多个节点为了满足较高的性能需求,需要使用本地缓存,使用了本地缓存,每个节点都会有一份缓存数据的拷贝,如果这些数据是静态的、不变的,那永远都不会有问题,但是如果这些数据是半静态的或者常被更新的,当被更新的时候,各个节点更新是有先后顺序的,在更新的瞬间,各个节点的数据是不一致的,如果这些数据是为某一个开关服务的,想象一下重复的请求走进了不同的节点(在failover或者补偿导致的场景下,重复请求是一定会发生的,也是服务化系统必须处理的),一个请求走了开关打开的逻辑,同时另外一个请求走了开关关闭的逻辑,这导致请求被处理两次,最坏的情况下会导致灾难性的后果,就是资金损失。

案例10:缓存数据结构不一致

这个案例会时有发生,某系统需要种某一数据结构的缓存,这一数据结构有多个数据元素组成,其中,某个数据元素都需要从数据库中或者服务中获取,如果一部分数据元素获取失败,由于程序处理不正确,仍然将不完全的数据结构存入缓存,那么缓存的消费者消费的时候很有可能因为没有合理处理异常情况而出错。

3 模式

3.1 生活中不一致问题的解决

大家回顾一下上一节列举的生活中的案例1-买房,如果置身事外来看,解决这种不一致的办法有两个,一个是避免不一致的发生,如果已经是媳妇了就不好办了:),还有一种方法就是慢慢的补偿,先买个两居,然后慢慢的等资金充裕了再换三居,买比特币赚了再换带花园的房子,于是问题最终被解决了,最终大家处于一致的状态,都开心了。这样可以解决案例1的问题,很自然由于有了过渡的方法,问题在不经意间就消失了,可见“过渡”也是解决一致性问题的一个模式。

从案例1的解决方案来看,我们要解决一致性问题,一个最直接最简单的方法就是保持强一致性,对于案例1的情况,尽量避免在结婚前两个人能够互相了解达成一致,避免不一致问题的发生;不过有些事情事已至此,发生了就是发生了,出现了不一致的问题,我们应该考虑去补偿,尽最大的努力从不一致状态修复到一致状态,避免损失全部或者一部分,也不失为一个好方法。

因此,避免不一致是上策,出现了不一致及时发现及时修复是中策,有问题不积极解决留给他人解决是下策。

3.2 酸碱平衡理论

ACID在英文中的意思是“酸”,BASE的意识是“碱”,这一段讲的是“酸碱平衡”的故事。

1. ACID(酸)

如何保证强一致性呢?计算机专业的童鞋在学习关系型数据库的时候都学习了ACID原理,这里对ACID做个简单的介绍。如果想全面的学习ACID原理,请参考ACID

关系型数据库天生就是解决具有复杂事务场景的问题,关系型数据库完全满足ACID的特性。

ACID指的是:

  • A: Atomicity,原子性
  • C: Consistency,一致性
  • I: Isolation,隔离性
  • D: Durability,持久性

具有ACID的特性的数据库支持强一致性,强一致性代表数据库本身不会出现不一致,每个事务是原子的,或者成功或者失败,事物间是隔离的,互相完全不影响,而且最终状态是持久落盘的,因此,数据库会从一个明确的状态到另外一个明确的状态,中间的临时状态是不会出现的,如果出现也会及时的自动的修复,因此是强一致的。

3个典型的关系型数据库Oracle、Mysql、Db2都能保证强一致性,Oracle和Mysql使用多版本控制协议实现,而DB2使用改进的两阶段提交协议来实现。

如果你在为交易相关系统做技术选型,交易的存储应该只考虑关系型数据库,对于核心系统,如果需要较好的性能,可以考虑使用更强悍的硬件,这种向上扩展(升级硬件)虽然成本较高,但是是最简单粗暴有效的方式,另外,Nosql完全不适合交易场景,Nosql主要用来做数据分析、ETL、报表、数据挖掘、推荐、日志处理等非交易场景。

前面提到的案例2-转账案例3-下订单和扣库存都可以利用关系型数据库的强一致性解决。

然而,前面提到,互联网项目多数具有大规模高并发的特性,必须应用拆分的理念,对高并发的压力采取“大而化小、小而化了”的方法,否则难以满足动辄亿级流量的需求,即使使用关系型数据库,单机也难以满足存储和TPS上的需求。为了保证案例2-转账可以利用关系型数据库的强一致性,在拆分的时候尽量的把转账相关的账户放入一个数据库分片,对于案例3,尽量的保证把订单和库存放入同一个数据库分片,这样通过关系型数据库自然就解决了不一致的问题。

然而,有些时候事与愿违,由于业务规则的限制,无法将相关的数据分到同一个数据库分片,这个时候我们就需要实现最终一致性。

对于案例2-转账场景,假设账户数量巨大,对账户存储进行了拆分,关系型数据库一共分了8个实例,每个实例8个库,每个库8个表,共512张表,假如要转账的两个账户正好落在了一个库里,那么可以依赖关系型数据库的事务保持强一致性。

如果要转账的两个账户正好落在了不同的库里,转账操作是无法封装在同一个数据库事务中的,这个时候会发生一个库的账户扣减余额成功,另外一个库的账户增加余额失败的情况。

对于这种情况,我们需要继续探讨解决之道,CAP原理和BASE原理,BASE原理通过记录事务的中间的临时状态,实现最终一致性。

2. CAP(帽子理论)

如果想深入的学习CAP理论,请参考CAP

由于对系统或者数据进行了拆分,我们的系统不再是单机系统,而是分布式系统,针对分布式系的帽子理论包含三个元素:

  • C:Consistency,一致性, 数据一致更新,所有数据变动都是同步的
  • A:Availability,可用性, 好的响应性能,完全的可用性指的是在任何故障模型下,服务都会在有限的时间处理响应
  • P:Partition tolerance,分区容错性,可靠性

帽子理论证明,任何分布式系统只可同时满足二点,没法三者兼顾。关系型数据库由于关系型数据库是单节点的,因此,不具有分区容错性,但是具有一致性和可用性,而分布式的服务化系统都需要满足分区容错性,那么我们必须在一致性和可用性中进行权衡,具体表现在服务化系统处理的异常请求在某一个时间段内可能是不完全的,但是经过自动的或者手工的补偿后,达到了最终的一致性。

3. BASE(碱)

BASE理论解决CAP理论提出了分布式系统的一致性和可用性不能兼得的问题,如果想全面的学习BASE原理,请参考Eventual consistency

BASE在英文中有“碱”的意思,对应本节开头的ACID在英文中“酸”的意思,基于这两个名词提出了酸碱平衡的结论,简单来说是在不同的场景下,可以分别利用ACID和BASE来解决分布式服务化系统的一致性问题。

BASE模型与ACID模型截然不同,满足CAP理论,通过牺牲强一致性,获得可用性,一般应用在服务化系统的应用层或者大数据处理系统,通过达到最终一致性来尽量满足业务的绝大部分需求。

BASE模型包含个三个元素:

  • BA:Basically Available,基本可用
  • S:Soft State,软状态,状态可以有一段时间不同步
  • E:Eventually Consistent,最终一致,最终数据是一致的就可以了,而不是时时保持强一致

BASE模型的软状态是实现BASE理论的方法,基本可用和最终一致是目标。按照BASE模型实现的系统,由于不保证强一致性,系统在处理请求的过程中,可以存在短暂的不一致,在短暂的不一致窗口请求处理处在临时状态中,系统在做每步操作的时候,通过记录每一个临时状态,在系统出现故障的时候,可以从这些中间状态继续未完成的请求处理或者退回到原始状态,最后达到一致的状态。

案例1-转账为例,我们把用户A给用户B转账分成四个阶段,第一个阶段用户A准备转账,第二个阶段从用户A账户扣减余额,第三个阶段对用户B增加余额,第四个阶段完成转账。系统需要记录操作过程中每一步骤的状态,一旦系统出现故障,系统能够自动发现没有完成的任务,然后,根据任务所处的状态,继续执行任务,最终完成任务,达到一致的最终状态。

在实际应用中,上面这个过程通常是通过持久化执行任务的状态和环境信息,一旦出现问题,定时任务会捞取未执行完的任务,继续未执行完的任务,直到执行完成为止,或者取消已经完成的部分操作回到原始状态。这种方法在任务完成每个阶段的时候,都要更新数据库中任务的状态,这在大规模高并发系统中不会有太好的性能,一个更好的办法是用Write-Ahead Log(写前日志),这和数据库的Bin Log(操作日志)相似,在做每一个操作步骤,都先写入日志,如果操作遇到问题而停止的时候,可以读取日志按照步骤进行恢复,并且继续执行未完成的工作,最后达到一致。写前日志可以利用机械硬盘的追加写而达到较好性能,因此,这是一种专业化的实现方式,多数业务系系统还是使用数据库记录的字段来记录任务的执行状态,也就是记录中间的“软状态”,一个任务的状态流转一般可以通过数据库的行级锁来实现,这比使用Write-Ahead Log实现更简单、更快速。

有了BASE理论作为基础,我们对复杂的分布式事务进行拆解,对其中的每一步骤都记录其状态,有问题的时候可以根据记录的状态来继续执行任务,达到最终的一致,通过这个方法我们可以解决案例2-转账案例3-下订单和扣库存中遇到的问题。

4. 酸碱平衡的总结

  1. 使用向上扩展(强悍的硬件)运行专业的关系型数据库(例如:Oracle或者DB2)能够保证强一致性,钱能解决的问题就不是问题
  2. 如果钱是问题,可以对廉价硬件运行的开源关系型数据库(例如:Mysql)进行分片,将相关的数据分到数据库的同一个片,仍然能够使用关系型数据库保证事务
  3. 如果业务规则限制,无法将相关的数据分到同一个片,就需要实现最终一致性,通过记录事务的软状态(中间状态、临时状态),一旦处于不一致,可以通过系统自动化或者人工干预来修复不一致的情况

3.3 分布式一致性协议

国际开放标准组织Open Group定义了DTS(分布式事务处理模型),模型中包含4个角色:应用程序、事务管理器、资源管理器、通信资源管理器四部分。事务处理器是统管全局的管理者,资源处理器和通信资源处理器是事务的参与者。

J2EE规范也包含此分布式事务处理模型的规范,并在所有的AppServer中进行实现,J2EE规范中定义了TX协议和XA协议,TX协议定义应用程序与事务管理器之间的接口,而XA协议定义了事务管理器与资源处理器之间的接口,在过去,大家使用AppServer,例如:Websphere、Weblogic、Jboss等配置数据源的时候会看见类似XADatasource的数据源,这就是实现了DTS的关系型数据库的数据源。企业级开发JEE中,关系型数据库、JMS服务扮演资源管理器的角色,而EJB容器则扮演事务管理器的角色。

下面我们就介绍两阶段提交协议三阶段提交协议以及阿里巴巴提出的TCC,它们都是根据DTS这一思想演变出来的。

1. 两阶段提交协议

上面描述的JEE的XA协议就是根据两阶段提交来保证事务的完整性,并实现分布式服务化的强一致性。

两阶段提交协议把分布式事务分成两个过程,一个是准备阶段,一个是提交阶段,准备阶段和提交阶段都是由事务管理器发起的,为了接下来讲解方便,我们把事务管理器称为协调者,把资管管理器称为参与者。

两阶段如下:

  1. 准备阶段:协调者向参与者发起指令,参与者评估自己的状态,如果参与者评估指令可以完成,参与者会写redo或者undo日志(这也是前面提起的Write-Ahead Log的一种),然后锁定资源,执行操作,但是并不提交
  2. 提交阶段:如果每个参与者明确返回准备成功,也就是预留资源和执行操作成功,协调者向参与者发起提交指令,参与者提交资源变更的事务,释放锁定的资源;如果任何一个参与者明确返回准备失败,也就是预留资源或者执行操作失败,协调者向参与者发起中止指令,参与者取消已经变更的事务,执行undo日志,释放锁定的资源

两阶段提交协议成功场景示意图如下:

两阶段提交协议

我们看到两阶段提交协议在准备阶段锁定资源,是一个重量级的操作,并能保证强一致性,但是实现起来复杂、成本较高,不够灵活,更重要的是它有如下致命的问题:

  1. 阻塞:从上面的描述来看,对于任何一次指令必须收到明确的响应,才会继续做下一步,否则处于阻塞状态,占用的资源被一直锁定,不会被释放
  2. 单点故障:如果协调者宕机,参与者没有了协调者指挥,会一直阻塞,尽管可以通过选举新的协调者替代原有协调者,但是如果之前协调者在发送一个提交指令后宕机,而提交指令仅仅被一个参与者接受,并且参与者接收后也宕机,新上任的协调者无法处理这种情况
  3. 脑裂:协调者发送提交指令,有的参与者接收到执行了事务,有的参与者没有接收到事务,就没有执行事务,多个参与者之间是不一致的

上面所有的这些问题,都是需要人工干预处理,没有自动化的解决方案,因此两阶段提交协议在正常情况下能保证系统的强一致性,但是在出现异常情况下,当前处理的操作处于错误状态,需要管理员人工干预解决,因此可用性不够好,这也符合CAP协议的一致性和可用性不能兼得的原理。

2. 三阶段提交协议

三阶段提交协议是两阶段提交协议的改进版本。它通过超时机制解决了阻塞的问题,并且把两个阶段增加为三个阶段:

  1. 询问阶段:协调者询问参与者是否可以完成指令,协调者只需要回答是还是不是,而不需要做真正的操作,这个阶段超时导致中止
  2. 准备阶段:如果在询问阶段所有的参与者都返回可以执行操作,协调者向参与者发送预执行请求,然后参与者写redo和undo日志,执行操作,但是不提交操作;如果在询问阶段任何参与者返回不能执行操作的结果,则协调者向参与者发送中止请求,这里的逻辑与两阶段提交协议的的准备阶段是相似的,这个阶段超时导致成功
  3. 提交阶段:如果每个参与者在准备阶段返回准备成功,也就是预留资源和执行操作成功,协调者向参与者发起提交指令,参与者提交资源变更的事务,释放锁定的资源;如果任何一个参与者返回准备失败,也就是预留资源或者执行操作失败,协调者向参与者发起中止指令,参与者取消已经变更的事务,执行undo日志,释放锁定的资源,这里的逻辑与两阶段提交协议的提交阶段一致

三阶段提交协议成功场景示意图如下:

三阶段提交协议

然而,这里与两阶段提交协议有两个主要的不同:

  1. 增加了一个询问阶段,询问阶段可以确保尽可能早的发现无法执行操作而需要中止的行为,但是它并不能发现所有的这种行为,只会减少这种情况的发生
  2. 在准备阶段以后,协调者和参与者执行的任务中都增加了超时,一旦超时,协调者和参与者都继续提交事务,默认为成功,这也是根据概率统计上超时后默认成功的正确性最大

三阶段提交协议与两阶段提交协议相比,具有如上的优点,但是一旦发生超时,系统仍然会发生不一致,只不过这种情况很少见罢了,好处就是至少不会阻塞和永远锁定资源。

3. TCC

上面两节讲解了两阶段提交协议和三阶段提交协议,实际上他们能解决案例2-转账案例3-下订单和扣库存中的分布式事务的问题,但是遇到极端情况,系统会发生阻塞或者不一致的问题,需要运营或者技术人工解决。无论两阶段还是三阶段方案中都包含多个参与者、多个阶段实现一个事务,实现复杂,性能也是一个很大的问题,因此,在互联网高并发系统中,鲜有使用两阶段提交和三阶段提交协议的场景。

阿里巴巴提出了新的TCC协议,TCC协议将一个任务拆分成Try、Confirm、Cancel,正常的流程会先执行Try,如果执行没有问题,再执行Confirm,如果执行过程中出了问题,则执行操作的逆操Cancel,从正常的流程上讲,这仍然是一个两阶段的提交协议,但是,在执行出现问题的时候,有一定的自我修复能力,如果任何一个参与者出现了问题,协调者通过执行操作的逆操作来取消之前的操作,达到最终的一致状态。

可以看出,从时序上,如果遇到极端情况下TCC会有很多问题的,例如,如果在Cancel的时候一些参与者收到指令,而一些参与者没有收到指令,整个系统仍然是不一致的,这种复杂的情况,系统首先会通过补偿的方式,尝试自动修复的,如果系统无法修复,必须由人工参与解决。

从TCC的逻辑上看,可以说TCC是简化版的三阶段提交协议,解决了两阶段提交协议的阻塞问题,但是没有解决极端情况下会出现不一致和脑裂的问题。然而,TCC通过自动化补偿手段,会把需要人工处理的不一致情况降到到最少,也是一种非常有用的解决方案,根据线人,阿里在内部的一些中间件上实现了TCC模式。

我们给出一个使用TCC的实际案例,在秒杀的场景,用户发起下单请求,应用层先查询库存,确认商品库存还有余量,则锁定库存,此时订单状态为待支付,然后指引用户去支付,由于某种原因用户支付失败,或者支付超时,系统会自动将锁定的库存解锁供其他用户秒杀。

TCC协议使用场景示意图如下:

TCC

总结一下,两阶段提交协议、三阶段提交协议、TCC协议都能保证分布式事务的一致性,他们保证的分布式系统的一致性从强到弱,TCC达到的目标是最终一致性,其中任何一种方法都可以不同程度的解决案例2:转账、案例3:下订单和扣库存的问题,只是实现的一致性的级别不一样而已,对于案例4:同步超时可以通过TCC的理念解决,如果同步调用超时,调用方可以使用fastfail策略,返回调用方的使用方失败的结果,同时调用服务的逆向cancel操作,保证服务的最终一致性。

3.4 保证最终一致性的模式

在大规模高并发服务化系统中,一个功能被拆分成多个具有单一功能的元功能,一个流程会有多个系统的多个元功能组合实现,如果使用两阶段提交协议和三阶段提交协议,确实能解决系统间一致性问题,除了这两个协议带来的自身的问题,这些协议的实现比较复杂、成本比较高,最重要的是性能并不好,相比来看,TCC协议更简单、容易实现,但是TCC协议由于每个事务都需要执行Try,再执行Confirm,略微显得臃肿,因此,在现实的系统中,底线要求仅仅需要能达到最终一致性,而不需要实现专业的、复杂的一致性协议,实现最终一致性有一些非常有效的、简单粗暴的模式,下面就介绍这些模式及其应用场景。

1. 查询模式

任何一个服务操作都需要提供一个查询接口,用来向外部输出操作执行的状态。服务操作的使用方可以通过查询接口,得知服务操作执行的状态,然后根据不同状态来做不同的处理操作。

为了能够实现查询,每个服务操作都需要有唯一的流水号标识,也可使用此次服务操作对应的资源ID来标志,例如:请求流水号、订单号等。

首先,单笔查询操作是必须提供的,我们也鼓励使用单笔订单查询,这是因为每次调用需要占用的负载是可控的,批量查询则根据需要来提供,如果使用了批量查询,需要有合理的分页机制,并且必须限制分页的大小,以及对批量查询的QPS需要有容量评估和流控等。

查询模式的示意图如下:

查询模式

对于案例4:同步超时、案例5:异步回调超时、案例6:掉单、案例7:系统间状态不一致,我们都需要使用查询模式来了解被调用服务的处理情况,来决定下一步做什么:补偿未完成的操作还是回滚已经完成的操作。

2. 补偿模式

有了上面的查询模式,在任何情况下,我们都能得知具体的操作所处的状态,如果整个操作处于不正常的状态,我们需要修正操作中有问题的子操作,这可能需要重新执行未完成的子操作,后者取消已经完成的子操作,通过修复使整个分布式系统达到一致,为了让系统最终一致而做的努力都叫做补偿。

对于服务化系统中同步调用的操作,业务操作发起的主动方在还没有得到业务操作执行方的明确返回或者调用超时,场景可参考案例4:同步超时,这个时候业务发起的主动方需要及时的调用业务执行方获得操作执行的状态,这里使用查询模式,获得业务操作的执行方的状态后,如果业务执行方已经完预设的工作,则业务发起方给业务的使用方返回成功,如果业务操作的执行方的状态为失败或者未知,则会立即告诉业务的使用方失败,然后调用业务操作的逆向操作,保证操作不被执行或者回滚已经执行的操作,让业务的使用方、业务发起的主动方、业务的操作方最终达成一致的状态。

补偿模式的示意图如下:

补偿模式

补偿操作根据发起形式分为:

  1. 自动恢复:程序根据发生不一致的环境,通过继续未完成的操作,或者回滚已经完成的操作,自动来达到一致
  2. 通知运营:如果程序无法自动恢复,并且设计时考虑到了不一致的场景,可以提供运营功能,通过运营手工进行补偿
  3. 通知技术:如果很不巧,系统无法自动回复,又没有运营功能,那必须通过技术手段来解决,技术手段包括走数据库变更或者代码变更来解决,这是最糟的一种场景

3. 异步确保模式

异步确保模式是补偿模式的一个典型案例,经常应用到使用方对响应时间要求并不太高,我们通常把这类操作从主流程中摘除,通过异步的方式进行处理,处理后把结果通过通知系统通知给使用方,这个方案最大的好处能够对高并发流量进行消峰,例如:电商系统中的物流、配送,以及支付系统中的计费、入账等。

实践中,将要执行的异步操作封装后持久入库,然后通过定时捞取未完成的任务进行补偿操作来实现异步确保模式,只要定时系统足够健壮,任何一个任务最终会被成功执行。

异步确保模式的示意图如下:

异步确保模式

对于案例5:异步回调超时,使用的就是异步确保模式,这种情况下对于某个操作,如果迟迟没有收到响应,我们通过查询模式和补偿模式来继续未完成的操作。

4. 定期校对模式

既然我们在系统中实现最终一致性,系统在没有达到一致之前,系统间的状态是不一致的,甚至是混乱的,需要补偿操作来达到一致的目的,但是我们如何来发现需要补偿的操作呢?

在操作的主流程中的系统间执行校对操作,我们可以事后异步的批量校对操作的状态,如果发现不一致的操作,则进行补偿,补偿操作与补偿模式中的补偿操作是一致的。

另外,实现定期校对的一个关键就是分布式系统中需要有一个自始至终唯一的ID,ID的生成请参考SnowFlake

在分布式系统中,全局唯一ID的示意图如下:

唯一ID

一般情况下,生成全局唯一ID有两种方法:

  1. 持久型:使用数据库表自增字段或者Sequence生成,为了提高效率,每个应用节点可以缓存一批次的ID,如果机器重启可能会损失一部分ID,但是这并不会产生任何问题
  2. 时间型:一般由机器号、业务号、时间、单节点内自增ID组成,由于时间一般精确到秒或者毫秒,因此不需要持久就能保证在分布式系统中全局唯一、粗略递增能特点

实践中,为了能在分布式系统中迅速的定位问题,一般的分布式系统都有技术支持系统,它能够跟踪一个请求的调用链,调用链是在二维的维度跟踪一个调用请求,最后形成一个调用树,原理可参考谷歌的论文Dapper, a Large-Scale Distributed Systems Tracing Infrastructure,一个开源的参考实现为pinpoint

在分布式系统中,调用链的示意图如下:

调用链

全局的唯一流水ID可以把一个请求在分布式系统中的流转的路径聚合,而调用链中的spanid可以把聚合的请求路径通过树形结构进行展示,让技术支持人员轻松的发现系统出现的问题,能够快速定位出现问题的服务节点,提高应急效率。

关于订单跟踪、调用链跟踪、业务链跟踪,我们会在后续文章中详细介绍。

在分布式系统中构建了唯一ID,调用链等基础设施,我们很容易对系统间的不一致进行核对,通常我们需要构建第三方的定期核对系统,以第三方的角度来监控服务执行的健康程度。

定期核对系统示意图如下:

定期核对模式

对于案例6:掉单、案例7:系统间状态不一致通常通过定期校对模式发现问题,并通过补偿模式来修复,最后完成系统间的最终一致性。

定期校对模式多应用在金融系统,金融系统由于涉及到资金安全,需要保证百分之百的准确性,所以,需要多重的一致性保证机制,包括:系统间的一致性对账、现金对账、账务对账、手续费对账等等,这些都属于定期校对模式,顺便说一下,金融系统与社交应用在技术上本质的区别在于社交应用在于量大,而金融系统在于数据的准确性。

到现在为止,我们看到通过查询模式、补偿模式、定期核对模式可以解决案例4到案例7的所有问题,对于案例4:同步超时,如果同步超时,我们需要查询状态进行补偿,对于案例5:异步回调超时,如果迟迟没有收到回调响应,我们也会通过查询状态进行补偿,对于案例6:掉单、案例7:系统间状态不一致,我们通过定期核对模式可以保证系统间操作的一致性,避免掉单和状态不一致导致问题。

5. 可靠消息模式

在分布式系统中,对于主流程中优先级比较低的操作,大多采用异步的方式执行,也就是前面提到的异步确保型,为了让异步操作的调用方和被调用方充分的解耦,也由于专业的消息队列本身具有可伸缩、可分片、可持久等功能,我们通常通过消息队列实现异步化,对于消息队列,我们需要建立特殊的设施保证可靠的消息发送以及处理机的幂等等。

消息的可靠发送

消息的可靠发送可以认为是尽最大努力发送消息通知,有两种实现方法:

第一种,发送消息之前,把消息持久到数据库,状态标记为待发送,然后发送消息,如果发送成功,将消息改为发送成功。定时任务定时从数据库捞取一定时间内未发送的消息,将消息发送。

消息发送模式1

第二种,实现方式与第一种类似,不同的是持久消息的数据库是独立的,并不耦合在业务系统中。发送消息之前,先发送一个预消息给某一个第三方的消息管理器,消息管理器将其持久到数据库,并标记状态为待发送,发送成功后,标记消息为发送成功。定时任务定时从数据库捞取一定时间内未发送的消息,回查业务系统是否要继续发送,根据查询结果来确定消息的状态。

消息发送模式2

一些公司把消息的可靠发送实现在了中间件里,通过Spring的注入,在消息发送的时候自动持久消息记录,如果有消息记录没有发送成功,定时会补偿发送。

消息处理器的幂等性

如果我们要保证消息可靠的发送,简单来说,要保证消息一定要发送出去,那么就需要有重试机制,有了重试机制,消息一定会重复,那么我们需要对重复做处理。

处理重复的最佳方式为保证操作的幂等性,幂等性的数学公式为:

f(f(x)) = f(x)

保证操作的幂等性常用的几个方法:

  1. 使用数据库表的唯一键进行滤重,拒绝重复的请求
  2. 使用分布式表对请求进行滤重
  3. 使用状态流转的方向性来滤重,通常使用行级锁来实现(后续在锁相关的文章中详细说明)
  4. 根据业务的特点,操作本身就是幂等的,例如:删除一个资源、增加一个资源、获得一个资源等

6. 缓存一致性模型

大规模高并发系统中一个常见的核心需求就是亿级的读需求,显然,关系型数据库并不是解决高并发读需求的最佳方案,互联网的经典做法就是使用缓存抗读需求,下面有一些使用缓存的保证一致性的最佳实践:

  1. 如果性能要求不是非常的高,尽量使用分布式缓存,而不要使用本地缓存
  2. 种缓存的时候一定种完全,如果缓存数据的一部分有效,一部分无效,宁可放弃种缓存,也不要把部分数据种入缓存
  3. 数据库与缓存只需要保持弱一致性,而不需要强一致性,读的顺序要先缓存,后数据库,写的顺序要先数据库,后缓存

这里的最佳实践能够解决案例8:缓存和数据库不一致、案例9:本地缓存节点间不一致、案例10:缓存数据结构不一致的问题,对于数据存储层、缓存与数据库、Nosql等的一致性是更深入的存储一致性技术,将会在后续文章单独介绍,这里的数据一致性主要是处理应用层与缓存、应用层与数据库、一部分的缓存与数据库的一致性。

3.5 专题模式

这一节介绍特殊场景下的一致性问题和解决方案。

迁移开关的设计

在大多数企业里,新项目和老项目一般会共存,大家都在努力的下掉老项目,但是由于种种原因总是下不掉,如果要彻底的下掉老项目,就必须要有非常完善的迁移方案,迁移是一项非常复杂而艰巨的任务,我会在将来的文章中详细探讨迁移方案、流程和技术,这里我们只对迁移中使用的开关进行描述。

迁移过程必须使用开关,开关一般都会基于多个维度来设计,例如:全局的、用户的、角色的、商户的、产品的等等,如果迁移过程中遇到问题,我们需要关闭开关,迁移回老的系统,这需要我们的新系统兼容老的数据,老的系统也兼容新的数据,从某种意义上来讲,迁移比实现新系统更加困难。

曾经看过很多简单的开关设计,有的开关设计在应用层次,通过一个curl语句调用,没有权限控制,这样的开关在服务池的每个节点都是不同步的、不一致的;还有的系统把开关配置放在中心化的配置系统、数据库或者缓存等,处理的每个请求都通过统一的开关来判断是否迁移等等,这样的开关有一个致命的缺点,服务请求在处理过程中,开关可能会变化,各个节点之间开关可能不同步、不一致,导致重复的请求可能走到新的逻辑又走了老的逻辑,如果新的逻辑和老的逻辑没有保证幂等性,这个请求就被重复处理了,如果是金融行业的应用,可能会导致资金损失,电商系统可能会导致发货并退款等问题。

这里面我们推荐使用订单开关,不管我们在什么维度上设计了开关,接收到服务请求后,我们在请求创建的关联实体(例如:订单)上标记开关,以后的任何处理流程,包括同步的和异步的处理流程,都通过订单上的开关来判断,而不是通过全局的或者基于配置的开关,这样在订单创建的时候,开关已经确定,不再变更,一旦一份数据不再发生变化,那么它永远是线程安全的,并且不会有不一致的问题。

这个模式在生产中使用比较频繁,建议每个企业都把这个模式作为设计评审的一项,如果不检查这一项,很多开发童鞋都会偷懒,直接在配置中或者数据库中做个开关就上线了。

4 总结

本文从一致性问题的实践出发,从大规模高并发服务化系统的实践经验中进行总结,列举导致不一致的具体问题,围绕着具体问题,总结出解决不一致的方法,并且抽象成模式,供大家在开发服务化系统的过程中参考。

另外,由于篇幅有限,还有一些关于分布式一致性的技术无法在一篇文章中与大家分享,包括:paxos算法、raft算法、zab算法、nwr算法、一致性哈希等,我会在后续文章中详细介绍。

5 反馈与建议

from:http://www.jianshu.com/p/1156151e20c8