Category Archives: 高并发

Java concurrency (multi-threading)

Table of Contents
Java concurrency (multi-threading). This article describes how to do concurrent programming with Java. It covers the concepts of parallel programming, immutability, threads, the executor framework (thread pools), futures, callables CompletableFuture and the fork-join framework.

1. Concurrency

1.1. What is concurrency?

Concurrency is the ability to run several programs or several parts of a program in parallel. If a time consuming task can be performed asynchronously or in parallel, this improve the throughput and the interactivity of the program.

A modern computer has several CPU’s or several cores within one CPU. The ability to leverage these multi-cores can be the key for a successful high-volume application.

1.2. Process vs. threads

A process runs independently and isolated of other processes. It cannot directly access shared data in other processes. The resources of the process, e.g. memory and CPU time, are allocated to it via the operating system.

A thread is a so called lightweight process. It has its own call stack, but can access shared data of other threads in the same process. Every thread has its own memory cache. If a thread reads shared data it stores this data in its own memory cache. A thread can re-read the shared data.

A Java application runs by default in one process. Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior.

2. Improvements and issues with concurrency

2.1. Limits of concurrency gains

Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior. Concurrency promises to perform certain task faster as these tasks can be divided into subtasks and these subtasks can be executed in parallel. Of course the runtime is limited by parts of the task which can be performed in parallel.

The theoretical possible performance gain can be calculated by the following rule which is referred to as Amdahl’s Law.

If F is the percentage of the program which can not run in parallel and N is the number of processes, then the maximum performance gain is 1 / (F+ ((1-F)/n)).

2.2. Concurrency issues

Threads have their own call stack, but can also access shared data. Therefore you have two basic problems, visibility and access problems.

A visibility problem occurs if thread A reads shared data which is later changed by thread B and thread A is unaware of this change.

An access problem can occur if several thread access and change the same shared data at the same time.

Visibility and access problem can lead to

  • Liveness failure: The program does not react anymore due to problems in the concurrent access of data, e.g. deadlocks.
  • Safety failure: The program creates incorrect data.

3. Concurrency in Java

3.1. Processes and Threads

A Java program runs in its own process and by default in one thread. Java supports threads as part of the Java language via the Thread code. The Java application can create new threads via this class.

Java 1.5 also provides improved support for concurrency with the in the java.util.concurrent package.

3.2. Locks and thread synchronization

Java provides locks to protect certain parts of the code to be executed by several threads at the same time. The simplest way of locking a certain method or Java class is to define the method or class with the synchronized keyword.

The synchronized keyword in Java ensures:

  • that only a single thread can execute a block of code at the same time
  • that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock

Synchronization is necessary for mutually exclusive access to blocks of and for reliable communication between threads.

You can use the synchronized keyword for the definition of a method. This would ensure that only one thread can enter this method at the same time. Another threads which is calling this method would wait until the first threads leaves this method.

public synchronized void critial() {
    // some thread critical stuff
    // here
}

You can also use the synchronized keyword to protect blocks of code within a method. This block is guarded by a key, which can be either a string or an object. This key is called the lock.

All code which is protected by the same lock can only be executed by one thread at the same time

For example the following datastructure will ensure that only one thread can access the inner block of the add() and next() methods.

package de.vogella.pagerank.crawler;

import java.util.ArrayList;
import java.util.List;

/**
 * Data structure for a web crawler. Keeps track of the visited sites and keeps
 * a list of sites which needs still to be crawled.
 *
 * @author Lars Vogel
 *
 */
public class CrawledSites {
    private List<String> crawledSites = new ArrayList<String>();
    private List<String> linkedSites = new ArrayList<String>();

    public void add(String site) {
        synchronized (this) {
            if (!crawledSites.contains(site)) {
                linkedSites.add(site);
            }
        }
    }

    /**
     * Get next site to crawl. Can return null (if nothing to crawl)
     */
    public String next() {
        if (linkedSites.size() == 0) {
            return null;
        }
        synchronized (this) {
            // Need to check again if size has changed
            if (linkedSites.size() > 0) {
                String s = linkedSites.get(0);
                linkedSites.remove(0);
                crawledSites.add(s);
                return s;
            }
            return null;
        }
    }

}

3.3. Volatile

If a variable is declared with the volatile keyword then it is guaranteed that any thread that reads the field will see the most recently written value. The volatile keyword will not perform any mutual exclusive lock on the variable.

As of Java 5 write access to a volatile variable will also update non-volatile variables which were modified by the same thread. This can also be used to update values within a reference variable, e.g. for a volatile variable person. In this case you must use a temporary variable person and use the setter to initialize the variable and then assign the temporary variable to the final variable. This will then make the address changes of this variable and the values visible to other threads.

4. The Java memory model

4.1. Overview

The Java memory model describes the communication between the memory of the threads and the main memory of the application.

It defines the rules how changes in the memory done by threads are propagated to other threads.

The Java memory model also defines the situations in which a thread re-fresh its own memory from the main memory.

It also describes which operations are atomic and the ordering of the operations.

4.2. Atomic operation

An atomic operation is an operation which is performed as a single unit of work without the possibility of interference from other operations.

The Java language specification guarantees that reading or writing a variable is an atomic operation(unless the variable is of type long or double ). Operations variables of type long or double are only atomic if they declared with the volatile keyword.

Assume i is defined as int. The i++ (increment) operation it not an atomic operation in Java. This also applies for the other numeric types, e.g. long. etc).

The i++ operation first reads the value which is currently stored in i (atomic operations) and then it adds one to it (atomic operation). But between the read and the write the value of i might have changed.

Since Java 1.5 the java language provides atomic variables, e.g. AtomicInteger or AtomicLong which provide methods like getAndDecrement(), getAndIncrement() and getAndSet() which are atomic.

4.3. Memory updates in synchronized code

The Java memory model guarantees that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock.

5. Immutability and Defensive Copies

5.1. Immutability

The simplest way to avoid problems with concurrency is to share only immutable data between threads. Immutable data is data which cannot changed.

To make a class immutable make

  • all its fields final
  • the class declared as final
  • the this reference is not allowed to escape during construction
  • Any fields which refer to mutable data objects are
  • private
  • have no setter method
  • they are never directly returned of otherwise exposed to a caller
  • if they are changed internally in the class this change is not visible and has no effect outside of the class

An immutable class may have some mutable data which is uses to manages its state but from the outside this class nor any attribute of this class can get changed.

For all mutable fields, e.g. Arrays, that are passed from the outside to the class during the construction phase, the class needs to make a defensive-copy of the elements to make sure that no other object from the outside still can change the data

5.2. Defensive Copies

You must protect your classes from calling code. Assume that calling code will do its best to change your data in a way you didn’t expect it. While this is especially true in case of immutable data it is also true for non-immutable data which you still not expect that this data is changed outside your class.

To protect your class against that you should copy data you receive and only return copies of data to calling code.

The following example creates a copy of a list (ArrayList) and returns only the copy of the list. This way the client of this class cannot remove elements from the list.

package de.vogella.performance.defensivecopy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class MyDataStructure {
    List<String> list = new ArrayList<String>();

    public void add(String s) {
        list.add(s);
    }

    /**
     * Makes a defensive copy of the List and return it
     * This way cannot modify the list itself
     *
     * @return List
     */
    public List<String> getList() {
        return Collections.unmodifiableList(list);
    }
}

6. Threads in Java

The base means for concurrency are is the java.lang.Threads class. A Thread executes an object of type java.lang.Runnable.

Runnable is an interface with defines the run() method. This method is called by the Thread object and contains the work which should be done. Therefore the “Runnable” is the task to perform. The Thread is the worker who is doing this task.

The following demonstrates a task (Runnable) which counts the sum of a given range of numbers. Create a new Java project called de.vogella.concurrency.threads for the example code of this section.

package de.vogella.concurrency.threads;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * 

* MyRunnable is the task which will be performed * * @author Lars Vogel * */ public class MyRunnable implements Runnable { private final long countUntil; MyRunnable(long countUntil) { this.countUntil = countUntil; } @Override public void run() { long sum = 0; for (long i = 1; i < countUntil; i++) { sum += i; } System.out.println(sum); } }

The following example demonstrate the usage of the Thread and the Runnable class.

package de.vogella.concurrency.threads;

import java.util.ArrayList;
import java.util.List;

public class Main {

    public static void main(String[] args) {
        // We will store the threads so that we can check if they are done
        List<Thread> threads = new ArrayList<Thread>();
        // We will create 500 threads
        for (int i = 0; i < 500; i++) {
            Runnable task = new MyRunnable(10000000L + i);
            Thread worker = new Thread(task);
            // We can set the name of the thread
            worker.setName(String.valueOf(i));
            // Start the thread, never call method run() direct
            worker.start();
            // Remember the thread for later usage
            threads.add(worker);
        }
        int running = 0;
        do {
            running = 0;
            for (Thread thread : threads) {
                if (thread.isAlive()) {
                    running++;
                }
            }
            System.out.println("We have " + running + " running threads. ");
        } while (running > 0);

    }
}

Using the Thread class directly has the following disadvantages.

  • Creating a new thread causes some performance overhead.
  • Too many threads can lead to reduced performance, as the CPU needs to switch between these threads.
  • You cannot easily control the number of threads, therefore you may run into out of memory errors due to too many threads.

The java.util.concurrent package offers improved support for concurrency compared to the direct usage of Threads. This package is described in the next section.

7. Threads pools with the Executor Framework

You find this examples in the source section in Java project called de.vogella.concurrency.threadpools.

Thread pools manage a pool of worker threads. The thread pools contains a work queue which holds tasks waiting to get executed.

A thread pool can be described as a collection of Runnable objects.

(work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.

The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads. The ExecutorService adds life cycle methods to the Executor, which allows to shutdown the Executor and to wait for termination.

If you want to use one thread pool with one thread which executes several runnables you can use the Executors.newSingleThreadExecutor() method.

Create again the Runnable.

package de.vogella.concurrency.threadpools;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * 

* MyRunnable is the task which will be performed * * @author Lars Vogel * */ public class MyRunnable implements Runnable { private final long countUntil; MyRunnable(long countUntil) { this.countUntil = countUntil; } @Override public void run() { long sum = 0; for (long i = 1; i < countUntil; i++) { sum += i; } System.out.println(sum); } }

Now you run your runnables with the executor framework.

package de.vogella.concurrency.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    private static final int NTHREDS = 10;

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        for (int i = 0; i < 500; i++) {
            Runnable worker = new MyRunnable(10000000L + i);
            executor.execute(worker);
        }
        // This will make the executor accept no new threads
        // and finish all existing threads in the queue
        executor.shutdown();
        // Wait until all threads are finish
        executor.awaitTermination();
        System.out.println("Finished all threads");
    }
}

In case the threads should return some value (result-bearing threads) then you can use the java.util.concurrent.Callable class.

8. Futures and Callables

8.1. Futures and Callables

The executor framework presented in the last chapter uses Runnable objects. Unfortunately a Runnable cannot return a result to the caller.

In case you expect your threads to return a computed result you can use java.util.concurrent.Callable. The Callable object allows to return values after completion.

The Callable object uses generics to define the type of object which is returned.

If you submit a Callable object to an Executor, the framework returns an object of type java.util.concurrent.Future. Future exposes methods allowing a client to monitor the progress of a task being executed by a different thread. Therefore, a Future object can be used to check the status of a Callable. It can also be used to retrieve the result from the Callable.

On the Executor you can use the method submit to submit a Callable and to get a future. To retrieve the result of the future use the get() method.

package de.vogella.concurrency.callables;

import java.util.concurrent.Callable;

public class MyCallable implements Callable<Long> {
    @Override
    public Long call() throws Exception {
        long sum = 0;
        for (long i = 0; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}
package de.vogella.concurrency.callables;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutures {
    private static final int NTHREDS = 10;

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        List<Future<Long>> list = new ArrayList<Future<Long>>();
        for (int i = 0; i < 20000; i++) {
            Callable<Long> worker = new MyCallable();
            Future<Long> submit = executor.submit(worker);
            list.add(submit);
        }
        long sum = 0;
        System.out.println(list.size());
        // now retrieve the result
        for (Future<Long> future : list) {
            try {
                sum += future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println(sum);
        executor.shutdown();
    }
}

8.2. Drawbacks with Futures and Callables

The Future interface is limited as a model of asynchronously executed tasks. Future allows a client to query a Callable task for its result. It does not provide the option to register a callback method. A callback method would allow you to get a callback once a task is done. In Java 5 you could use ExecutorCompletionService for this purpose but as of Java 8 you can use the CompletableFuture interface which allows to provide a callback interface which is called once a task is completed.

9. CompletableFuture

Asynchronous task handling is important for any application which performs time consuming activities, as IO operations. Two basic approaches to asynchronous task handling are available to a Java application:

  • application logic blocks until a task completes
  • application logic is called once the task completes, this is called a nonblocking approach.

CompletableFuture extends the functionality of the Future interface for asynchronous calls. It also implements the CompletionStage interface. CompletionStage offers methods, that let you attach callbacks that will be executed on completion.

It adds standard techniques for executing application code when a task completes, including various ways to combine tasks. CompletableFuture support both blocking and nonblocking approaches, including regular callbacks.

This callback can be executed in another thread as the thread in which the CompletableFuture is executed.

The following example demonstrates how to create a basic CompletableFuture.

CompletableFuture.supplyAsync(this::doSomething);

CompletableFuture.supplyAsync runs the task asynchronously on the default thread pool of Java. It has the option to supply your custom executor to define the ThreadPool.

package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSimpleSnippet {
    public static void main(String[] args) {
        long started = System.currentTimeMillis();

        // configure CompletableFuture
        CompletableFuture<Integer> futureCount = createCompletableFuture();

            // continue to do other work
            System.out.println("Took " + (started - System.currentTimeMillis()) + " milliseconds" );

            // now its time to get the result
            try {
              int count = futureCount.get();
                System.out.println("CompletableFuture took " + (started - System.currentTimeMillis()) + " milliseconds" );

               System.out.println("Result " + count);
             } catch (InterruptedException | ExecutionException ex) {
                // Exceptions from the future should be handled here
            }
    }

    private static CompletableFuture<Integer> createCompletableFuture() {
        CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        // simulate long running task
                        Thread.sleep(5000);
                    } catch (InterruptedException e) { }
                    return 20;
                });
        return futureCount;
    }

}

The usage of the thenApply method is demonstrated by the following code snippet.

package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCallback {
    public static void main(String[] args) {
        long started = System.currentTimeMillis();

        CompletableFuture<String>  data = createCompletableFuture()
                .thenApply((Integer count) -> {
                    int transformedValue = count * 10;
                    return transformedValue;
                }).thenApply(transformed -> "Finally creates a string: " + transformed);

            try {
                System.out.println(data.get());
            } catch (InterruptedException | ExecutionException e) {

            }
    }

    public static CompletableFuture<Integer> createCompletableFuture() {
        CompletableFuture<Integer>  result = CompletableFuture.supplyAsync(() -> {
            try {
                // simulate long running task
                Thread.sleep(5000);
            } catch (InterruptedException e) { }
            return 20;
        });
        return result;
    }

}

10. Nonblocking algorithms

Java 5.0 provides supports for additional atomic operations. This allows to develop algorithm which are non-blocking algorithm, e.g. which do not require synchronization, but are based on low-level atomic hardware primitives such as compare-and-swap (CAS). A compare-and-swap operation check if the variable has a certain value and if it has this value it will perform this operation.

Non-blocking algorithms are typically faster than blocking algorithms, as the synchronization of threads appears on a much finer level (hardware).

For example this created a non-blocking counter which always increases. This example is contained in the project called de.vogella.concurrency.nonblocking.counter.

package de.vogella.concurrency.nonblocking.counter;

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger value = new AtomicInteger();
    public int getValue(){
        return value.get();
    }
    public int increment(){
        return value.incrementAndGet();
    }

    // Alternative implementation as increment but just make the
    // implementation explicit
    public int incrementLongVersion(){
        int oldValue = value.get();
        while (!value.compareAndSet(oldValue, oldValue+1)){
             oldValue = value.get();
        }
        return oldValue+1;
    }

}

And a test.

package de.vogella.concurrency.nonblocking.counter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
        private static final int NTHREDS = 10;

        public static void main(String[] args) {
            final Counter counter = new Counter();
            List<Future<Integer>> list = new ArrayList<Future<Integer>>();

            ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
            for (int i = 0; i < 500; i++) {
                Callable<Integer> worker = new  Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        int number = counter.increment();
                        System.out.println(number );
                        return number ;
                    }
                };
                Future<Integer> submit= executor.submit(worker);
                list.add(submit);

            }


            // This will make the executor accept no new threads
            // and finish all existing threads in the queue
            executor.shutdown();
            // Wait until all threads are finish
            while (!executor.isTerminated()) {
            }
            Set<Integer> set = new HashSet<Integer>();
            for (Future<Integer> future : list) {
                try {
                    set.add(future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            if (list.size()!=set.size()){
                throw new RuntimeException("Double-entries!!!");
            }

        }


}

The interesting part is how the incrementAndGet() method is implemented. It uses a CAS operation.

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

The JDK itself makes more and more use of non-blocking algorithms to increase performance for every developer. Developing correct non-blocking algorithm is not a trivial task.

For more information on non-blocking algorithm, e.g. examples for a non-blocking Stack and non-block LinkedList, please see http://www.ibm.com/developerworks/java/library/j-jtp04186/index.html

11. Fork-Join in Java 7

Java 7 introduce a new parallel mechanism for compute intensive tasks, the fork-join framework. The fork-join framework allows you to distribute a certain task on several workers and then wait for the result.

For Java 6.0 you can download the package (jsr166y) from the Download site.

For testing create the Java project “de.vogella.performance.forkjoin”. If you are not using Java 7 you also need to jsr166y.jar to the classpath.

Create first a algorithm package and then the following class.

package algorithm;

import java.util.Random;

/**
 *
 * This class defines a long list of integers which defines the problem we will
 * later try to solve
 *
 */
public class Problem {
    private final int[] list = new int[2000000];

    public Problem() {
        Random generator = new Random(19580427);
        for (int i = 0; i < list.length; i++) {
            list[i] = generator.nextInt(500000);
        }
    }

    public int[] getList() {
        return list;
    }

}

Define now the Solver class as shown in the following example coding.

The API defines other top classes, e.g. RecursiveAction, AsyncAction. Check the Javadoc for details.
package algorithm;

import java.util.Arrays;

import jsr166y.forkjoin.RecursiveAction;

public class Solver extends RecursiveAction {
    private int[] list;
    public long result;

    public Solver(int[] array) {
        this.list = array;
    }

    @Override
    protected void compute() {
        if (list.length == 1) {
            result = list[0];
        } else {
            int midpoint = list.length / 2;
            int[] l1 = Arrays.copyOfRange(list, 0, midpoint);
            int[] l2 = Arrays.copyOfRange(list, midpoint, list.length);
            Solver s1 = new Solver(l1);
            Solver s2 = new Solver(l2);
            forkJoin(s1, s2);
            result = s1.result + s2.result;
        }
    }
}

Now define a small test class for testing it efficiency.

package testing;

import jsr166y.forkjoin.ForkJoinExecutor;
import jsr166y.forkjoin.ForkJoinPool;
import algorithm.Problem;
import algorithm.Solver;

public class Test {

    public static void main(String[] args) {
        Problem test = new Problem();
        // check the number of available processors
        int nThreads = Runtime.getRuntime().availableProcessors();
        System.out.println(nThreads);
        Solver mfj = new Solver(test.getList());
        ForkJoinExecutor pool = new ForkJoinPool(nThreads);
        pool.invoke(mfj);
        long result = mfj.getResult();
        System.out.println("Done. Result: " + result);
        long sum = 0;
        // check if the result was ok
        for (int i = 0; i < test.getList().length; i++) {
            sum += test.getList()[i];
        }
        System.out.println("Done. Result: " + sum);
    }
}

12. Deadlock

A concurrent application has the risk of a deadlock. A set of processes are deadlocked if all processes are waiting for an event which another process in the same set has to cause.

For example if thread A waits for a lock on object Z which thread B holds and thread B wait for a look on object Y which is hold be process A then these two processes are locked and cannot continue in their processing.

This can be compared to a traffic jam, where cars(threads) require the access to a certain street(resource), which is currently blocked by another car(lock).

Deadlock

Copyright © 2012-2017 vogella GmbH. Free use of the software examples is granted under the terms of the EPL License. This tutorial is published under the Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Germany license.

See Licence.

from:http://www.vogella.com/tutorials/JavaConcurrency/article.html

Pinterest谈实战经验:如何在两年内实现零到数百亿的月访问

Pinterest一直保持着指数增长,每一个半月都会翻一翻。在两年内,他们实现了从0到数百亿的月PV;从开始的两个创始人加一个工程师增长到现在超过40个工程师,从一个小型的MySQL服务器增长到180个Web Enigne、240个API Enigne、88个MySQL DB(cc2.8xlarge,每个DB都会配置一个从属节点)、110个Redis Instance以及200个Mmecache Instance。

在一个名为 《Scaling Pinterest》 的主题演讲上,Pinterest的Yashwanth NelapatiMarty Weiner为我们讲述了这个戏剧性的过程。当然扩展到当下规模,Pinterest在众多选择中不可避免的走了许多的弯路,而Todd Hoff认为其中最宝贵的经验该归结于以下两点:

  1. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  2. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障,这导致他们对工具的选择有着特殊的偏好:成熟、简单、优秀、知名、被更多的用户喜爱、更好的支持、稳定且杰出的表现、通常情况下无故障以及免费。使用这些标准,他们选择了MySQL、Solr、Memcache、Redis、Cassandra,同时还抛弃了MongoDB。

同样这两个点是有关联的,符合第二个原则的工具就可以通过投入更多的主机进行扩展。即使负载的增加,项目也不会出现很多故障。即使真的出现难以解决的问题,至少有一个社区去寻找问题解决的方案。一旦你选择过于复杂和挑剔的工具,在扩展的道路上将充满荆棘。

需要注意的是所有他们选择的工具都依靠增加分片来进行扩展,而非通过集群。讲话中还阐述了为什么分片优于集群以及如何进行分片,这些想法可能是之前你闻所未闻的。

下面就看一下Pinterest扩展的阶段性时间轴:

项目背景

  • Pins是由其它零零碎碎信息集合成的图片,显示了对客户重要的信息,并且链接到它所在的位置。
  • Pinterest是一个社交网络,你可以follow(关注)其他人以及board。
  • 数据库:Pinterest的用户拥有board,而每个board都包含pin;follow及repin人际关系、验证信息。

1. 2010年3月发布——寻找真我的时代

在那时候,你甚至不知道需要建立一个什么样的产品。你有想法,所以你快速的迭代以及演变。而最终你将得到一些很小的MySQL查询,而这些查询在现实生活中你从未进行过。

Pinterest初期阶段的一些数字:

  • 2个创始人
  • 1个工程师
  • Rackspace
  • 1个小的网络引擎
  • 1个小的MySQL数据库
  • 2011年11月

仍然是小规模,产品通过用户反馈进行演变后的数字是:

  • Amazon EC2 + S3 + CloudFront
  • 1 NGinX, 4 Web Engines (用于冗余,不全是负载)
  • 1 MySQL DB + 1 Read Slave (用于主节点故障情况)
  • 1 Task Queue + 2 Task Processors
  • 1 MongoDB (用于计数)
  • 2 Engineers

2. 贯穿2011年——实验的时代

迈上疯狂增长的脚步,基本上每1个半月翻一翻。

  • 当你增长的如此之快,每一天每一星期你可能都需要打破或者抛弃一些东西。
  • 在这个时候,他们阅读大量的论文,这些论文都阐述着只需要添加一台主机问题就会得以解决。他们着手添加许多技术,随后又不得不放弃。
  • 于是出现了一些很奇怪的结果
  • Amazon EC2 + S3 + CloudFront
  • 2NGinX, 16 Web Engines + 2 API Engines
  • 5 Functionally Sharged MySQL DB + 9 read slaves
  • 4 Cassandra Nodes
  • 15 Membase Nodes (3 separate clusters)
  • 8 Memcache Nodes
  • 10 Redis Nodes
  • 3 Task Routers + 4 Task Processors
  • 4 Elastic Search Nodes
  • 3 Mongo Clusters
  • 3个工程师
  • 5个主数据库技术,只为了独立其中的数据。
  • 增长太快以至于MySQL疲于奔命,所有其它的技术也达到了极限。
  • 当你把事物用至极限时,这些技术都会以各自不同的方式出错。
  • 开始抛弃一些技术,并且自我反省究竟需要些什么,基本上重做了所有的架构。

3. 2012年2月——成熟的时代

  • 在重做了所有的架构后,系统呈现了如下状态
  • Amazon EC2 + S3 + Akamai, ELB
  • 90 Web Engines + 50 API Engines
  • 66 MySQL DBs (m1.xlarge) +,每个数据库都配备了从属节点
  • 59 Redis Instances
  • 51 Memcache Instances
  • 1 Redis Task Manager + 25 Task Processors
  • Sharded Solr
  • 6个工程师
  • 现在采用的技术是被分片的MySQL、Redis、Memcache和Solr,有点在于这些技术都很简单很成熟。
  • 网络传输增长仍然保持着以往的速度,而iPhone传输开始走高。

4. 2012年10月12日 —— 收获的季节

大约是1月份的4倍

  • 现在的数据是:
  • Amazon EC2 + S3 + Edge Cast,Akamai, Level 3
  • 180 Web Engines + 240 API Engines
  • 88 MySQL DBs (cc2.8xlarge) ,同样每个数据库都有一个从属节点
  • 110 Redis Instances
  • 200 Memcache Instances
  • 4 Redis Task Manager + 80 Task Processors
  • Sharded Solr
  • 40个工程师(仍在增长)
  • 需要注意的是,如今的架构已趋近完美,应对增长只需要投入更多的主机。
  • 当下已开始转移至SSD

下面一览该演讲中的干货,决策的制定:

为什么会选择EC2和S3

  1. 相当好的可靠性,即使数据中心发生故障。多租户会增加风险,但是也不是太坏。
  2. 良好的报告和支持。它们(EC2和S3)有着良好的架构,并且知道问题所在。
  3. 完善的周边设施,特别是在你需要快速增长时。你可以从APP Engine处获得maged cache、负载均衡、MapReduce、数据库管理以及其它你不想自己动手编写的组件,这可以加速你应用程序的部署,而在你工程师空闲时,你可以着手编写你需要的一切。
  4. 新的实例可以在几秒内就绪,这就是云的力量;特别是在只有两个工程师的初期,不需要去担心容量规划,更不需要花两个星期去建立自己的Memcache,你可以在数分钟内添加10个Memcached。
  5. 缺点:有限的选择。直到最近,才可以选择使用SSD,同时无法获得太大的内存配置。
  6. 优点:你不需要给大量的主机进行不同的配置。

为什么会选择MySQL

  1. 非常成熟。
  2. 非常稳定。不会宕机,并且不会丢失数据。
  3. 在招聘上具有优势,市场上有大把的人才。
  4. 在请求呈直线上升时,仍能将相应时间控制在一定的范围内,有些数据库技术在面对请求的飙升时表现并不是很好。
  5. 非常好的周边软件支持——XtraBackup、Innotop、Maatkit。
  6. 可以从类似Percona这样的公司得到优秀的技术支持。
  7. 开源(免费)——这一点非常重要,特别是在资金缺乏的初期

为什么使用Memcache

  • 非常成熟。
  • 非常简单。可以当成是一个socket哈希表
  • 杰出稳定的表现
  • 知名并为大量用户喜爱
  • 永不崩溃
  • 开源

为什么选择Redis

  • 虽然还不够成熟,但是非常简单及优秀
  • 提供了大量的数据结构类型
  • 提供多种的选择进行持久化和备份:你可以备份而非持久化,选择备份的话你还可以选择多久备份一次;同样你还可以选择使用什么方式进行持久化,比如MySQL等。
  • Home feed被储存在Redis上,每3个小时保存一次;然而并不是3个小时持久化一次,只是简单的每3个小时备份一次。
  • 如果你存储数据的主机发生故障,丢失的也只是备份周期内的数据。虽然不是完全可靠,但是非常简单。避免了复杂的持久化及复制,这样的架构简单且便宜。
  • 知名并为大量用户喜爱
  • 稳定且杰出的表现
  • 很少出故障。有一些专有的故障模型,你需要学会解决。这也是成熟的优势,只需要学习就可以解决。
  • 开源

Solr

  1. 只需要几分钟的安装时间,就可以投入使用
  2. 不能扩展到多于一台的机器上(最新版本并非如此)
  3. 尝试弹性搜索,但是以Pinterest的规模来说,可能会因为零碎文件和查询太多而产生问题。
  4. 选择使用Websolr,但是Pinterest拥有搜索团队,将来可能会开发自己的版本。

集群vs.分片

  • 在迅速扩展的过程中,Pinterest认识到每次负载的增加,都需要均匀的传播他们的数据。
  • 针对问题先确定解决方案的范围,他们选择的范围是集群和分片之间的一系列解决方案。

集群——所有的操作都是通过自动化

  • 比如:Cassandra、MemBase、HBase
  • 结论:没有安全感,将来可能会比较成熟,但是当下这个解决方案中还存在太多的复杂性和故障点。
  • 特性:
  • 数据自动分布
  • 节点间转移数据
  • 需要平衡分配
  • 节点间的相互通信,需要做很多措施用于防止干扰、无效传递及协商。
  • 优点:
  • 自动扩展你的数据存储,最起码论文中是这么说的。
  • 便于安装
  • 数据上的空间分布及机房共置。你可以在不同区域建立数据中心,数据库会帮你打理好一切。
  • 高有效性
  • 负载平衡
  • 不存在单点故障
  • 缺点:
  • 仍然不成熟。
  • 本质上说还很复杂。一大堆的节点必须对称协议,这一点非常难以解决。
  • 缺少社区支持。社区的讨论因为产品方向的不同而不能统一,而在每个正营中也缺乏强有力的支持。
  • 缺乏领域内资深工程师,可能大多数的工程师都还未使用过Cassandra。
  • 困难、没有安全感的机制更新。这可能是因为这些技术都使用API并且只在自己的领域内通行,这导致了复杂的升级路径。
  • 集群管理算法本身就用于处理SPOF(单点故障),如果存在漏洞的话可能就会影响到每个节点。
  • 集群管理器代码非常复杂,并且需要在所有节点上重复,这就可能存在以下的故障模式:
  • 数据平衡失控。当给集群中添加新的主机时,可能因为数据的拷贝而导致集群性能下降。那么你该做什么?这里不存在去发现问题所在的工具。没有社区可以用来求助,同样你也被困住了,这也是Pinterest回到MySQL的原因。
  • 跨节点的数据损坏。如果这里存在一个漏洞,这个漏洞可能会影响节点间的日志系统和压缩等其它组件?你的读延时增加,所有的数据都会陷入麻烦以及丢失。
  • 错误负载平衡很难被修复,这个现象十分普遍。如果你有10个节点,并且你注意到所有的负载都被堆积到一个节点上。虽然可以手动处理,但是之后系统还会将负载都加之一个节点之上。
  • 数据所有权问题,主次节点转换时的数据丢失。集群方案是非常智能的,它们会在特定的情况下完成节点权利的转换,而主次节点切换的过程中可能会导致数据的部分丢失,而丢失部分数据可能比丢失全部还糟糕,因为你不可能知道你究竟丢失了哪一部分。

分片——所有事情都是手动的

  • 结论:它是获胜者。Todd Hoff还认为他们的分片架构可能与Flickr架构类似。
  • 特性:
  • 分片可以让你摆脱集群方案中所有不想要的特性。
  • 数据需要手动的分配。
  • 数据不会移动。Pinterest永远都不会在节点间移动,尽管有些人这么做,这让他们在一定范围内站的更高。
  • 通过分割数据的方式分配负载。
  • 节点并没有互相通信,使用一些主节点控制程序的运行。
  • 优点:
  • 可以分割你的数据库以提高性能。
  • 空间分布及放置数据
  • 高有效性
  • 负载平衡
  • 放置数据的算法非常简单。主要原因是,用于处理单点故障的代码只有区区的半页,而不是一个复杂的集群管理器。并且经过短暂的测试就知道它是否能够正常工作。
  • ID生成非常简单
  • 缺点:
  • 不可以执行大多数的join。
  • 失去所有事务的能力。在一个数据库上的插入可能会成功,而在另一个上会失败。
  • 许多约束必须放到应用程序层。
  • 模式的转变需要从长计议。
  • 报告需要在所有分片上执行查询,然后需要手动的进行聚合。
  • Join在应用程序层执行。
  • 应用程序必须容忍以上所有问题。

什么时候进行分片

  1. 如果你的项目拥有PB级的数据,那么你需要立刻对其进行分片。
  2. Pin表格拥有百万行索引,索引大小已经溢出内存并被存入了磁盘。
  3. Pinterest使用了最大的表格,并将它们(这些索引)放入自己的数据库。
  4. 然后果断的超过了单数据库容量。
  5. 接着Pinterest必须进行分片。

分片的过渡

  • 过渡从一个特性的冻结开始。
  • 确认分片该达到什么样的效果——希望尽少的执行查询以及最少数量的数据库去呈现一个页面。
  • 剔除所有的MySQL join,将要做join的表格加载到一个单独的分片去做查询。
  • 添加大量的缓存,基本上每个查询都需要被缓存。
  • 这个步骤看起来像:
  • 1 DB + Foreign Keys + Joins
  • 1 DB + Denormalized + Cache
  • 1 DB + Read Slaves + Cache
  • Several functionally sharded DBs+Read Slaves+Cache
  • ID sharded DBs + Backup slaves + cache
  • 早期的只读从属节点一直都存在问题,因为存在slave lag。读任务分配给了从属节点,然而主节点并没有做任何的备份记录,这样就像一条记录丢失。之后Pinterest使用缓存解决了这个问题。
  • Pinterest拥有后台脚本,数据库使用它来做备份。检查完整性约束、引用。
  • 用户表并不进行分片。Pinterest只是使用了一个大型的数据库,并在电子邮件和用户名上做了相关的一致性约束。如果插入重复用户,会返回失败。然后他们对分片的数据库做大量的写操作。

如何进行分片

  • 可以参考Cassandra的ring模型、Membase以及Twitter的Gizzard。
  • 坚信:节点间数据传输的越少,你的架构越稳定。
  • Cassandra存在数据平衡和所有权问题,因为节点们不知道哪个节点保存了另一部分数据。Pinterest认为应用程序需要决定数据该分配到哪个节点,那么将永远不会存在问题。
  • 预计5年内的增长,并且对其进行预分片思考。
  • 初期可以建立一些虚拟分片。8个物理服务器,每个512DB。所有的数据库都装满表格。
  • 为了高有效性,他们一直都运行着多主节点冗余模式。每个主节点都会分配给一个不同的可用性区域。在故障时,该主节点上的任务会分配给其它的主节点,并且重新部署一个主节点用以代替。
  • 当数据库上的负载加重时:
  • 先着眼节点的任务交付速度,可以清楚是否有问题发生,比如:新特性,缓存等带来的问题。
  • 如果属于单纯的负载增加,Pinterest会分割数据库,并告诉应用程序该在何处寻找新的节点。
  • 在分割数据库之前,Pinterest会给这些主节点加入一些从属节点。然后置换应用程序代码以匹配新的数据库,在过渡的几分钟之内,数据会同时写入到新旧节点,过渡结束后将切断节点之间的通道。

ID结构

  • 一共64位
  • 分片ID:16位
  • Type:10位—— Board、User或者其它对象类型
  • 本地ID——余下的位数用于表中ID,使用MySQL自动递增。
  • Twitter使用一个映射表来为物理主机映射ID,这将需要备份;鉴于Pinterest使用AWS和MySQL查询,这个过程大约需要3毫秒。Pinterest并没有让这个额外的中间层参与工作,而是将位置信息构建在ID里。
  • 用户被随机分配在分片中间。
  • 每个用户的所有数据(pin、board等)都存放在同一个分片中,这将带来巨大的好处,避免了跨分片的查询可以显著的增加查询速度。
  • 每个board都与用户并列,这样board可以通过一个数据库处理。
  • 分片ID足够65536个分片使用,但是开始Pinterest只使用了4096个,这允许他们轻易的进行横向扩展。一旦用户数据库被填满,他们只需要增加额外的分片,然后让新用户写入新的分片就可以了。

查找

  • 如果存在50个查找,举个例子,他们将ID分割且并行的运行查询,那么延时将达到最高。
  • 每个应用程序都有一个配置文件,它将给物理主机映射一个分片范围。
  • “sharddb001a”: : (1, 512)
  • “sharddb001b”: : (513, 1024)——主要备份主节点
  • 如果你想查找一个ID坐落在sharddb003a上的用户:
  • 将ID进行分解
  • 在分片映射中执行查找
  • 连接分片,在数据库中搜寻类型。并使用本地ID去寻找这个用户,然后返回序列化数据。

对象和映射

  • 所有数据都是对象(pin、board、user、comment)或者映射(用户由baord,pin有like)。
  • 针对对象,每个本地ID都映射成MySQL Blob。开始时Blob使用的是JSON格式,之后会给转换成序列化的Thrift。
  • 对于映射来说,这里有一个映射表。你可以为用户读取board,ID包含了是时间戳,这样就可以体现事件的顺序。
  • 同样还存在反向映射,多表对多表,用于查询有哪些用户喜欢某个pin这样的操作。
  • 模式的命名方案是:noun_verb_noun: user_likes_pins, pins_like_user。
  • 只能使用主键或者是索引查找(没有join)。
  • 数据不会向集群中那样跨数据的移动,举个例子:如果某个用户坐落在20分片上,所有他数据都会并列存储,永远不会移动。64位ID包含了分片ID,所以它不可能被移动。你可以移动物理数据到另一个数据库,但是它仍然与相同分片关联。
  • 所有的表都存放在分片上,没有特殊的分片,当然用于检测用户名冲突的巨型表除外。
  • 不需要改变模式,一个新的索引需要一个新的表。
  • 因为键对应的值是blob,所以你不需要破坏模式就可以添加字段。因为blob有不同的版本,所以应用程序将检测它的版本号并且将新记录转换成相应的格式,然后写入。所有的数据不需要立刻的做格式改变,可以在读的时候进行更新。
  • 巨大的胜利,因为改变表格需要在上面加几个小时甚至是几天的锁。如果你需要一个新的索引,你只需要建立一张新的表格,并填入内容;在不需要的时候,丢弃就好。

呈现一个用户文件界面

  1. 从URL中取得用户名,然后到单独的巨型数据库中查询用户的ID。
  2. 获取用户ID,并进行拆分
  3. 选择分片,并进入
  4. SELECT body from users WHERE id =
  5. SELECT board_id FROM user_has_boards WHERE user_id=
  6. SELECT body FROM boards WHERE id IN ()
  7. SELECT pin_id FROM board_has_pins WHERE board_id=
  8. SELECT body FROM pins WHERE id IN (pin_ids)
  9. 所有调用都在缓存中进行(Memcache或者Redis),所以在实践中并没有太多连接数据库的后端操作。

脚本相关

  1. 当你过渡到一个分片架构,你拥有两个不同的基础设施——没有进行分片的旧系统和进行分片的新系统。脚本成为了新旧系统之间数据传输的桥梁。
  2. 移动5亿的pin、16亿的follower行等。
  3. 不要轻视项目中的这一部分,Pinterest原认为只需要2个月就可以完成数据的安置,然而他们足足花了4至5个月时间,别忘了期间他们还冻结了一项特性。
  4. 应用程序必须同时对两个系统插入数据。
  5. 一旦确认所有的数据都在新系统中就位,就可以适当的增加负载来测试新后端。
  6. 建立一个脚本农场,雇佣更多的工程师去加速任务的完成。让他们做这些表格的转移工作。
  7. 设计一个Pyres副本,一个到GitHub Resque队列的Python的接口,这个队列建立在Redis之上。支持优先级和重试,使用Pyres取代Celery和RabbitMQ更是让他们受益良多。
  8. 处理中会产生大量的错误,用户可能会发现类似丢失board的错误;必须重复的运行任务,以保证在数据的处理过程中不会出现暂时性的错误。

开发相关

  • 开始尝试只给开发者开放系统的一部分——他们每个人都拥有自己的MySQL服务器等,但是事情改变的太快,以至于这个模式根本无法实行。
  • 转变成Facebook模式,每个人都可以访问所有东西,所以不得不非常小心。

未来的方向

  • 基于服务的架构
  • 当他们发现大量的数据库负载,他们开始布置大量的应用程序服务器和一些其它的服务器,所有这些服务器都连接至MySQL和Memcache。这意味着在Memcache上将存在3万的连接,这些连接将占用几个G的内存,同时还会产生大量的Memcache守护进程。
  • 为了解决这个问题,将这些工作转移到了一个服务架构。比如:使用一个follower服务,这个服务将专注处理follower查询。这将接下30台左右的主机去连接数据库和缓存,从而减少了连接的数量。
  • 对功能进行隔离,各司其职。让一个服务的开发者不能访问其它的服务,从而杜绝安全隐患。

学到的知识

  1. 为了应对未来的问题,让其保持简单。
  2. 让其变的有趣。只要应用程序还在使用,就会有很多的工程师加入,过于复杂的系统将会让工作失去乐趣。让架构保持简单就是大的胜利,新的工程师从入职的第一周起就可以对项目有所贡献。
  3. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障。
  4. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  5. 集群管理算法本身就用于处理SPOF,如果存在漏洞的话可能就会影响到每个节点。
  6. 为了快速的增长,你需要为每次负载增加的数据进行均匀分配。
  7. 在节点间传输的数据越少,你的架构越稳定。这也是他们弃集群而选择分片的原因。
  8. 一个面向服务的架构规则。拆分功能,可以帮助减少连接、组织团队、组织支持以及提升安全性。
  9. 搞明白自己究竟需要什么。为了匹配愿景,不要怕丢弃某些技术,甚至是整个系统的重构。
  10. 不要害怕丢失一点数据。将用户数据放入内存,定期的进行持久化。失去的只是几个小时的数据,但是换来的却是更简单、更强健的系统!

原文链接: Scaling Pinterest – From 0 To 10s Of Billions Of Page Views A Month In Two Years (编译/仲浩 审校/王旭东)

from:http://www.csdn.net/article/2013-04-16/2814902-how-pinterest-scaling-0-to-billions-pv

Spring SpringMVC Mybatis 分布式系统

前言

zheng项目创建于2016年10月4日,正在慢慢成长中,目的不仅仅是一个开发架构,而是努力打造一套从 前端模板基础框架分布式架构开源项目持续集成自动化部署系统监测无缝升级 的全方位J2EE企业级开发解决方案。

项目介绍

基于Spring+SpringMVC+Mybatis分布式敏捷开发系统架构,提供整套公共微服务服务模块:内容管理、支付中心、用户管理(包括第三方)、微信平台、存储系统、配置中心、日志分析、任务和通知等,支持服务治理、监控和追踪,努力为中小型企业打造全方位J2EE企业级开发解决方案。

组织结构

zheng
├── zheng-common -- SSM框架公共模块
├── zheng-admin -- 后台管理模板
├── zheng-ui -- 前台thymeleaf模板[端口:1000]
├── zheng-config -- 配置中心[端口:1001]
├── zheng-upms -- 用户权限管理系统
|    ├── zheng-upms-common -- upms系统公共模块
|    ├── zheng-upms-dao -- 代码生成模块,无需开发
|    ├── zheng-upms-client -- 集成upms依赖包,提供单点认证、授权、统一会话管理
|    ├── zheng-upms-rpc-api -- rpc接口包
|    ├── zheng-upms-rpc-service -- rpc服务提供者
|    └── zheng-upms-server -- 用户权限系统及SSO服务端[端口:1111]
├── zheng-cms -- 内容管理系统
|    ├── zheng-cms-common -- cms系统公共模块
|    ├── zheng-cms-dao -- 代码生成模块,无需开发
|    ├── zheng-cms-rpc-api -- rpc接口包
|    ├── zheng-cms-rpc-service -- rpc服务提供者
|    ├── zheng-cms-search -- 搜索服务[端口:2221]
|    ├── zheng-cms-admin -- 后台管理[端口:2222]
|    ├── zheng-cms-job -- 消息队列、任务调度等[端口:2223]
|    └── zheng-cms-web -- 网站前台[端口:2224]
├── zheng-pay -- 支付系统
|    ├── zheng-pay-common -- pay系统公共模块
|    ├── zheng-pay-dao -- 代码生成模块,无需开发
|    ├── zheng-pay-rpc-api -- rpc接口包
|    ├── zheng-pay-rpc-service -- rpc服务提供者
|    ├── zheng-pay-sdk -- 开发工具包
|    ├── zheng-pay-admin -- 后台管理[端口:3331]
|    └── zheng-pay-web -- 演示示例[端口:3332]
├── zheng-ucenter -- 用户系统(包括第三方登录)
|    ├── zheng-ucenter-common -- ucenter系统公共模块
|    ├── zheng-ucenter-dao -- 代码生成模块,无需开发
|    ├── zheng-ucenter-rpc-api -- rpc接口包
|    ├── zheng-ucenter-rpc-service -- rpc服务提供者
|    └── zheng-ucenter-web -- 网站前台[端口:4441]
├── zheng-wechat -- 微信系统
|    ├── zheng-wechat-mp -- 微信公众号管理系统
|    |    ├── zheng-wechat-mp-dao -- 代码生成模块,无需开发
|    |    ├── zheng-wechat-mp-service -- 业务逻辑
|    |    └── zheng-wechat-mp-admin -- 后台管理[端口:5551]
|    └── zheng-ucenter-app -- 微信小程序后台
├── zheng-api -- API接口总线系统
|    ├── zheng-api-common -- api系统公共模块
|    ├── zheng-api-rpc-api -- rpc接口包
|    ├── zheng-api-rpc-service -- rpc服务提供者
|    └── zheng-api-server -- api系统服务端[端口:6666]
├── zheng-oss -- 对象存储系统
|    ├── zheng-oss-sdk -- 开发工具包
|    ├── zheng-oss-web -- 前台接口[端口:7771]
|    └── zheng-oss-admin -- 后台管理[端口:7772]
├── zheng-shop -- 电子商务系统
├── zheng-im -- 即时通讯系统
├── zheng-oa -- 办公自动化系统
├── zheng-eoms -- 运维系统
└── zheng-demo -- 示例模块(包含一些示例代码等)
     ├── zheng-demo-rpc-api -- rpc接口包
     ├── zheng-demo-rpc-service -- rpc服务提供者
     └── zheng-demo-web -- 演示示例[端口:8888]

技术选型

后端技术:

技术 名称 官网
Spring Framework 容器 http://projects.spring.io/spring-framework/
SpringMVC MVC框架 http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#mvc
Apache Shiro 安全框架 http://shiro.apache.org/
Spring session 分布式Session管理 http://projects.spring.io/spring-session/
MyBatis ORM框架 http://www.mybatis.org/mybatis-3/zh/index.html
MyBatis Generator 代码生成 http://www.mybatis.org/generator/index.html
PageHelper MyBatis物理分页插件 http://git.oschina.net/free/Mybatis_PageHelper
Druid 数据库连接池 https://github.com/alibaba/druid
FluentValidator 校验框架 https://github.com/neoremind/fluent-validator
Thymeleaf 模板引擎 http://www.thymeleaf.org/
Velocity 模板引擎 http://velocity.apache.org/
ZooKeeper 分布式协调服务 http://zookeeper.apache.org/
Dubbo 分布式服务框架 http://dubbo.io/
TBSchedule & elastic-job 分布式调度框架 https://github.com/dangdangdotcom/elastic-job
Redis 分布式缓存数据库 https://redis.io/
Solr & Elasticsearch 分布式全文搜索引擎 http://lucene.apache.org/solr/ https://www.elastic.co/
Quartz 作业调度框架 http://www.quartz-scheduler.org/
Ehcache 进程内缓存框架 http://www.ehcache.org/
ActiveMQ 消息队列 http://activemq.apache.org/
JStorm 实时流式计算框架 http://jstorm.io/
FastDFS 分布式文件系统 https://github.com/happyfish100/fastdfs
Log4J 日志组件 http://logging.apache.org/log4j/1.2/
Swagger2 接口测试框架 http://swagger.io/
sequence 分布式高效ID生产 http://git.oschina.net/yu120/sequence
AliOSS & Qiniu & QcloudCOS 云存储 https://www.aliyun.com/product/oss/ http://www.qiniu.com/ https://www.qcloud.com/product/cos
Protobuf & json 数据序列化 https://github.com/google/protobuf
Jenkins 持续集成工具 https://jenkins.io/index.html
Maven 项目构建管理 http://maven.apache.org/

前端技术:

技术 名称 官网
jQuery 函式库 http://jquery.com/
Bootstrap 前端框架 http://getbootstrap.com/
Bootstrap-table Bootstrap数据表格 http://bootstrap-table.wenzhixin.net.cn/
Font-awesome 字体图标 http://fontawesome.io/
material-design-iconic-font 字体图标 https://github.com/zavoloklom/material-design-iconic-font
Waves 点击效果插件 https://github.com/fians/Waves
zTree 树插件 http://www.treejs.cn/v3/
Select2 选择框插件 https://github.com/select2/select2
jquery-confirm 弹出窗口插件 https://github.com/craftpip/jquery-confirm
jQuery EasyUI 基于jQuery的UI插件集合体 http://www.jeasyui.com
React 界面构建框架 https://github.com/facebook/react
Editor.md Markdown编辑器 https://github.com/pandao/editor.md
zhengAdmin 后台管理系统模板 https://github.com/shuzheng/zhengAdmin
autoMail 邮箱地址自动补全插件 https://github.com/shuzheng/autoMail
zheng.jprogress.js 加载进度条插件 https://github.com/shuzheng/zheng.jprogress.js
zheng.jtotop.js 返回顶部插件 https://github.com/shuzheng/zheng.jtotop.js

架构图

架构图

模块依赖

模块依赖

模块介绍

zheng-common

Spring+SpringMVC+Mybatis框架集成公共模块,包括公共配置、MybatisGenerator扩展插件、通用BaseService、工具类等。

zheng-admin

基于bootstrap实现的响应式Material Design风格的通用后台管理系统,zheng项目所有后台系统都是使用该模块界面作为前端展示。

zheng-ui

各个子系统前台thymeleaf模板,前端资源模块,使用nginx代理,实现动静分离。

zheng-upms

本系统是基于RBAC授权和基于用户授权的细粒度权限控制通用平台,并提供单点登录、会话管理和日志管理。接入的系统可自由定义组织、角色、权限、资源等。用户权限=所拥有角色权限合集+用户加权限-用户减权限,优先级:用户减权限>用户加权限>角色权限

zheng-oss

文件存储系统,提供四种方案:

  • 阿里云 OSS
  • 腾讯云 COS
  • 七牛云
  • 本地分布式存储

阿里云OSS

zheng-api

接口总线系统,对外暴露统一规范的接口,包括各个子系统的交互接口、对外开放接口、开发加密接口、接口文档等服务,示例图:

API网关

zheng-cms

内容管理系统:支持多标签、多类目、强大评论的内容管理,有基本单页展示,菜单管理,系统设置等功能。

zheng-pay

  • 一站式支付解决方案,统一下单接口,支持支付宝、微信、网银等多种支付方式。不涉及业务的纯粹的支付平台。
  • 统一下单(统一下单接口、统一扫码)、订单管理、数据分析、财务报表、商户管理、渠道管理、对账系统、系统监控。

统一扫码支付

zheng-ucenter

通用用户管理系统, 实现最常用的用户注册、登录、资料管理、个人中心、第三方登录等基本需求,支持扩展二次开发。

zheng-wechat-mp

微信公众号管理平台,除实现官网后台自动回复、菜单管理、素材管理、用户管理、消息群发等基础功能外,还有二维码推广、营销活动、微网站、会员卡、优惠券等。

zheng-wechat-app

微信小程序后台

环境搭建(QQ群内有“zheng环境搭建和系统部署文档.doc”)

开发工具:

  • MySql: 数据库
  • jetty: 开发服务器
  • Tomcat: 应用服务器
  • SVN|Git: 版本管理
  • Nginx: 反向代理服务器
  • Varnish: HTTP加速器
  • IntelliJ IDEA: 开发IDE
  • PowerDesigner: 建模工具
  • Navicat for MySQL: 数据库客户端

开发环境:

  • Jdk7+
  • Mysql5.5+
  • Redis
  • Zookeeper
  • ActiveMQ
  • Dubbo-admin
  • Dubbo-monitor

工具安装

环境搭建和系统部署文档(作者:小兵,QQ群共享提供下载)

资源下载

开发指南:

  • 1、本机安装Jdk7、Mysql、Redis、Zookeeper、ActiveMQ并启动相关服务,使用默认配置默认端口即可
  • 2、克隆源代码到本地并打开,推荐使用IntelliJ IDEA,本地编译并安装到本地maven仓库

修改本地Host

  • 127.0.0.1 ui.zhangshuzheng.cn
  • 127.0.0.1 upms.zhangshuzheng.cn
  • 127.0.0.1 cms.zhangshuzheng.cn
  • 127.0.0.1 pay.zhangshuzheng.cn
  • 127.0.0.1 ucenter.zhangshuzheng.cn
  • 127.0.0.1 wechat.zhangshuzheng.cn
  • 127.0.0.1 api.zhangshuzheng.cn
  • 127.0.0.1 oss.zhangshuzheng.cn
  • 127.0.0.1 config.zhangshuzheng.cn
  • 127.0.0.1 zkserver
  • 127.0.0.1 rdserver
  • 127.0.0.1 dbserver
  • 127.0.0.1 mqserver

编译流程

maven编译安装zheng/pom.xml文件即可

启动顺序(后台)

准备工作

  • 新建zheng数据库,导入project-datamodel文件夹下的zheng.sql
  • 修改各dao模块和rpc-service模块的redis.properties、jdbc.properties、generator.properties数据库连接等配置信息,其中master.redis.password、master.jdbc.password、slave.jdbc.password、generator.jdbc.password密码值使用了AES加密,请使用com.zheng.common.util.AESUtil工具类修改这些值
  • 启动Zookeeper、Redis、ActiveMQ、Nginx(配置文件参考project-tools/nginx下的*.conf文件)

zheng-upms

  • 首先启动 zheng-upms-rpc-service(直接运行src目录下的ZhengUpmsRpcServiceApplication#main方法启动) => zheng-upms-server(jetty),然后按需启动对应子系统xxx的zheng-xxx-rpc-service(main方法) => zheng-xxx-webapp(jetty)

启动演示

  • 访问 http://upms.zhangshuzheng.cn:1111/,子系统菜单已经配置到zheng-upms权限中,不用直接访问子系统,默认帐号密码:admin/123456
  • 登录成功后,可在右上角切换已注册系统访问

zheng-cms

  • zheng-cms-admin:启动ActiveMQ-启动 => 启动zheng-rpc-service => 启动zheng-cms-admin
  • zheng-cms-web:启动nginx代理zheng-ui静态资源,配置文件可参考 nginx.conf

zheng-oss

  • 首先启动zheng-oss-web服务
  • 开发阶段,如果zheng-oss-web没有公网域名,推荐使用ngrok内网穿透工具,为开发环境提供公网域名,实现上传回调
  • 启动nginx代理zheng-ui静态资源

开发演示(QQ群内有“zheng十分钟视频:从检出到启动.wmv”)

  • 创建数据表(建议使用PowerDesigner)
  • 直接运行对应项目dao模块中的generator.main(),可自动生成单表的CRUD功能和对应的model、example、mapper、service代码
    • 生成的model和example均已实现Serializable接口,支持分布式
    • 已包含抽象类BaseServiceImpl,只需要继承抽象类并传入泛型参数,即可默认实现mapper接口所有方法,特殊需求直接扩展即可
    • BaseServiceImpl默认已实现四种根据条件分页接口
      • selectByExampleWithBLOBsForStartPage()
      • selectByExampleForStartPage()
      • selectByExampleWithBLOBsForOffsetPage()
      • selectByExampleForOffsetPage()
    • BaseServiceImpl方法根据读写操作自动切换主从数据源,继承的扩展接口,可手动通过DynamicDataSource.setDataSource(DataSourceEnum.XXX.getName())指定数据源
  • 启动流程:优先rcp-service服务提供者,再启动其他webapp
  • 扩展流程:可扩展和拆分rpc-api和rpc-service模块,可按微服务拆分或场景拆分

部署方式(QQ群内有“zheng十分钟视频:从打包到linux服务器部署.wmv”)

  • war包项目:使用tomcat等web容器启动
  • rpc-service服务提供者jar包:将打包后的zheng-xxx-rpc-service-assembly.tar.gz文件解压,使用bin目录的管理脚本运行即可,支持优雅停机。

框架规范约定

约定优于配置(convention over configuration),此框架约定了很多编程规范,下面一一列举:


- service类,需要在叫名`service`的包下,并以`Service`结尾,如`CmsArticleServiceImpl`

- controller类,需要在以`controller`结尾的包下,类名以Controller结尾,如`CmsArticleController.java`,并继承`BaseController`

- spring task类,需要在叫名`task`的包下,并以`Task`结尾,如`TestTask.java`

- mapper.xml,需要在名叫`mapper`的包下,并以`Mapper.xml`结尾,如`CmsArticleMapper.xml`

- mapper接口,需要在名叫`mapper`的包下,并以`Mapper`结尾,如`CmsArticleMapper.java`

- model实体类,需要在名叫`model`的包下,命名规则为数据表转驼峰规则,如`CmsArticle.java`

- spring配置文件,命名规则为`applicationContext-*.xml`

- 类名:首字母大写驼峰规则;方法名:首字母小写驼峰规则;常量:全大写;变量:首字母小写驼峰规则,尽量非缩写

- springmvc配置加到对应模块的`springMVC-servlet.xml`文件里

- 配置文件放到`src/main/resources`目录下

- 静态资源文件放到`src/main/webapp/resources`目录下

- jsp文件,需要在`/WEB-INF/jsp`目录下

- `RequestMapping`和返回物理试图路径的url尽量写全路径,如:`@RequestMapping("/manage")`、`return "/manage/index"`

- `RequestMapping`指定method

- 模块命名为`项目`-`子项目`-`业务`,如`zheng-cms-admin`

- 数据表命名为:`子系统`_`表`,如`cms_article`

- 更多规范,参考[[阿里巴巴Java开发手册] http://git.oschina.net/shuzheng/zheng/attach_files

演示地址

演示地址: http://upms.zhangshuzheng.cn/

预览图

idea login upms cms swagger

数据模型

数据库模型

拓扑图

拓扑图

开发进度

开发进度

参与开发

首先谢谢大家支持,如果你希望参与开发,欢迎通过Github上fork本项目,并Pull Request您的commit。

常见问题

  • Eclipse下,dubbo找不到dubbo.xsd报错,不影响使用,如果要解决,可参考 http://blog.csdn.net/gjldwz/article/details/50555922
  • 报zheng-xxx.jar包找不到,请按照文档编译顺序,将源代码编译并安装到本地maven仓库
  • zheng-cms-admin启动卡住:因为没有启动activemq
  • zheng-upms-server访问报session不存在:因为没有启动redis服务
  • 界面没有样式:因为zheng-admin没有编译安装到本地仓库

附件

优秀文章和博客

在线小工具

在线文档

from:https://github.com/shuzheng/zheng/blob/master/README.md

消息队列设计精要

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。

本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。

本文首先会阐述什么时候你需要一个消息队列,然后以Push模型为主,从零开始分析设计一个消息队列时需要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。

也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题,如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。

何时需要消息队列

当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。

比如在美团旅游,我们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候,如果不使用消息系统,势必要调用我们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,作为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。

而我们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来说,这也不是我们的职责所在。说白了,如果他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来说太过于“重量级”了,只需要发布一个产品ID变更的通知,由下游系统来处理,可能更为合理。

再举一个例子,对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。

业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。

以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。

然而,这个过程中存在很多可能的意外:

  1. A扣钱成功,调用B加钱接口失败。
  2. A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
  3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。
  2. 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
  3. 回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
  4. 整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
  5. 具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
  6. broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
  7. 我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。

像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。

这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。

对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。

支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。

当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。

如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

如何设计一个消息队列

综述

我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。

 

基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。

我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
  3. 掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。

利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。

之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。

为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。

在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。

下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。

实现队列基本功能

RPC通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定义的框架也好。因为消息队列的RPC,和普通的RPC没有本质区别。当然了,自主利用Memchached或者Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了类似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,都可以使用现成的RPC框架。

简单来讲,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你可能会问,如果producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到所有consumer的存在。其次,producer尽量选择就近的机房就好了。

高可用

其实所有的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。

那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心,放心大胆的交给DBA们吧!

对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步,关于这块HA的细节,可以参考下篇pull模型消息系统设计。

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。

只是这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。

持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。

但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。

市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。

存储子系统的选择

我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。

但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,可以参考下篇的存储子系统设计。

分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

消费关系解析

现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。

市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。

消息需要通知到多个业务集群,而一个业务集群内有很多台机器,只要一台机器消费这个消息就可以了。

当然这不是绝对的,很多时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种情况太过于复杂,一般不列入考虑范围。所以,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。

至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:

  1. 发送关系的维护。
  2. 发送关系变更时的通知。

队列高级特性设计

上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。

可靠投递(最终一致性)

这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。

方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。

具体来说:

  1. producer往broker发送消息之前,需要做一次落地。
  2. 请求到server后,server确保数据落地后再告诉客户端发送成功。
  3. 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。

重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。

Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。

最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。

消费确认

当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不一定。或许因为消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。

把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。

对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制,

消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。

但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。

重复消息和顺序消息

上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:

  1. 允许消息丢失。
  2. 从发送方到服务方到接受者都是单点单线程。

所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。

一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。

谈到重复消息,主要是两个话题:

  1. 如何鉴别消息重复,并幂等的处理重复消息。
  2. 一个消息队列如何尽量减少重复消息的投递。

先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就

能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

  1. 版本号。
  2. 状态机。

版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。

但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。

每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。

如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。

参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。

如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储

就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。

就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。

举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。

那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。

此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

中间件对于重复消息的处理

回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。

我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。

减少重复消息的关键步骤:

  1. broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
  2. 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。

解决方案从大方向上有两种:

  1. 两阶段提交,分布式事务。
  2. 本地事务,本地落地,补偿发送。

分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。

并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。

那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。

然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。

这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。

而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。

本地事务存在两个最大的使用障碍:

  1. 配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
  2. 对于消息延迟高敏感的业务不适用。

话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;同步是需要当时关心

的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。

回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。

对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。整个过程可以参考下面的代码:

客户端同步服务端异步。

 

Future future = request(server);//server立刻返回future

synchronized(future){

while(!future.isDone()){

 future.wait();//server处理结束后会notify这个future,并修改isdone标志

}

}

return future.get();

 

客户端同步服务端同步。

Result result = request(server);

客户端异步服务端同步(这里用线程池的方式)。

Future future = executor.submit(new Callable(){public void call(){
    result = request(server);
}})
return future;

客户端异步服务端异步。

Future future = request(server);//server立刻返回future


return future

上面说了这么多,其实是想让大家脱离两个误区:

  1. RPC只有客户端能做异步,服务端不能。
  2. 异步只能通过线程池。

那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并

(当然接口可以设计成batch的,但可能batch发过来的仍然数量较少)。而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。

来看第二个误区,返回future的方式不一定只有线程池。换句话说,可以在线程池里面进行同步操作,也可以进行异步操作,也可以不使用线程池使用异步操作(NIO、事件)。

回到消息队列的议题上,我们当然不希望消息的发送阻塞主流程(前面提到了,server端如果使用异步模型,则可能因消息合并带来一定程度上的消息延迟),所以可以先使用线程池提交一个发送请求,主流程继续往下走。

但是线程池中的请求关心结果吗?Of course,必须等待服务端消息成功落地,才算是消息发送成功。所以这里的模型,准确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务需要等待server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继续进行。

总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。

批量

谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:

  1. 攒够了一定数量。
  2. 到达了一定时间。
  3. 队列里有新的数据到来。

对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。

在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
},500,500,TimeUnits.MILLS);
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。
   public void run(){
      List messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
   }
});
public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}
private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}

相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。

具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。

最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:

  1. 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
  2. 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

push还是pull

上文提到的消息队列,大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。

我们简要分析下push和pull模型各自存在的利弊。

慢消费

慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。

反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。

但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,

可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?

当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。

即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。

在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~

顺序消息

如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。

反观pull模式,如果想做到全局顺序消息,就相对容易很多:

  1. producer对应partition,并且单线程。
  2. consumer对应partition,消费确认(或批量确认),继续消费即可。

所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。如果你不想看到通篇乱套的日志~~

Anyway,需要顺序消息的场景还是比较有限的而且成本太高,请慎重考虑。

总结

本文从为何使用消息队列开始讲起,然后主要介绍了如何从零开始设计一个消息队列,包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型做了简要分析,最后从批量和异步角度,分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题,如存储系统的设计、流控和错峰的设计、公平调度等。希望通过这些,让大家对消息队列有个提纲挈领的整体认识,并给自主开发消息队列提供思路。另外,本文主要是源自自己在开发消息队列中的思考和读源码时的体会,比较不”官方”,也难免会存在一些漏洞,欢迎大家多多交流。

后续我们还会推出消息队列设计高级篇,内容会涵盖以下方面:

  • pull模型消息系统设计理念
  • 存储子系统设计
  • 流量控制
  • 公平调度

敬请期待哦~

作者简介

王烨,现在是美团旅游后台研发组的程序猿,之前曾经在百度、去哪和优酷工作过,专注Java后台开发。对于网络编程和并发编程具有浓厚的兴趣,曾经做过一些基础组件,也翻过一些源码,属于比较典型的宅男技术控。期待能够与更多知己,在coding的路上并肩前行~

from:https://zhuanlan.zhihu.com/p/21649950

 

架构技术实践系列文章