Category Archives: Microservices

微服务在微信的架构实践

微服务的理念与腾讯一直倡导的“大系统小做”有很多相通之处,本文将分享微信后台架构的服务发现、通信机制、集群管理等基础能力与其上层服务划分原则、代码管理规则等。
背景介绍

首先,我们需要敏捷开发。过去几年,微信都是很敏捷地在开发一些业务。所以我们的底层架构需要支撑业务的快速发展,会有一些特殊的需求。

另外,目前整个微信团队已经有一千多人了,开发人员也有好几百。整个微信底层框架是统一的,微信后台有千级模块的系统。比如说某某服务,有上千个微服务在跑,而集群机器数有几万台,那么在这样的规模下,我们会有怎么样的挑战呢?

我们一直在说“大系统小做”,联想一下,微服务与腾讯的理念有哪些相同与不同的地方呢?通过对比,最终发现还是有许多相通的地方。所以我挑出来讲讲我们的实践。

看过过去几个会议的内容,可能大家会偏向于讲整一个大的框架,比如整个云的架构。但是我这边主要讲的是几个特殊的点。

概览详情

开始看一下我们的结构。全球都有分布,主要有上海、深圳、香港、加拿大几个数据中心。

其中上海服务国内北方的用户,深圳负责南方用户,加拿大服务北美、南美和欧洲,香港服务东南亚、中东和非洲地区。

然后来看看我们的架构:

  • 最上边是我们的产品;
  • 然后有一个号称几亿在线的长连接和短连接的服务;
  • 中间有一个逻辑层,后台框架讲的主要是逻辑层往后这块,包括我们的 RPC、服务分组、过载保护与数据存储;
  • 底层有个 PaxosStore 存储平台。

整套就是这么个体系。微服务很容易去构建,但是规模变大后有哪些问题,需要哪些能力?这里挑出三个点来讲一下:

一、敏捷

希望你的服务很快实现,不太多去考虑。像我们早期互联网业务,甚至包括 QQ 等,我们很注重架构师的一个能力,他需要把握很多的东西。他设置每个服务的时候,要先算好很多资源,算好容灾怎么做。容灾这个问题直接影响业务怎么去实现的,所以有可能你要做一个具体逻辑的时候要考虑很多问题,比如接入服务、数据同步、容灾等等每个点都要考虑清楚,所以节奏会慢。

二、容错

当你的机器到了数万台,那每天都有大量机器会有故障。再细一点,可以说是每一个盘的故障更频繁一点。

三、高并发

基础架构

接下来看看我们的基础架构。

整个微服务的架构上,我们通常分成这些部分:

  • 服务布局
  • 服务之间怎么做一些远程调用
  • 容错(主要讲一下过载保护)
  • 部署管理
服务布局

分两层,一个是城市间。城市之间的数据是相对独立的,除了少数账号全球同步,大部分业务都希望做成电子邮件式的服务,各自有自身的环境在跑,之间使用类似于电子邮件的通信。所以我们选择让每个城市自治,它们之间有一个 200-400ms 的慢速网络,国内会快点,30ms。

而城市内部,就是每个园区是一套独立的系统,可以互相为对方提供备份。要求独立的电源与网络接入。

城市内部会有整套的划分,终端 –>接入层 –>逻辑层 –>存储层 都是完全独立的一套系统。

远程调用

看到很多框架,竟然是没有协程的,这很诧异。早年我们 QQ 邮箱、微信、图像压缩、反垃圾都是一个 web 服务,只有存储层会独立到后面去,甚至用 web 直连 MySQL。因为它早期比较小,后来变大之后就用微服务架构。

每个东西都变成一个小的服务,他们是跨机的。你可以想象一下,每天我们很多人买早餐的时候,掏出手机做一个微信支付,这一个动作在后台会引起上百次的调用。这有一个复杂的链路。在 2014 年之前,我们微信就是没有做异步的,都是同步的,在这么多调用里,A 服务调用 B,那要先等它返回,这样就占住了一条进程或者线程。所以其实 13 年的时候,我们发生了大大小小的故障,很大一部分原因就在这里。

然后 13 年底的时候,这个问题太严重了,严重到,比如发消息的时候,你去拿一个头像之类的,它只要抖动,就可能引发整一条调用链的问题,并且因为过程保护的不完善,它会把整个消息发送的曲线掉下去,这是我们很痛苦的时间。

然后当时我们就去考虑这些方案,13 年的时候抽出 3 个人重新做了一个完整的库 libco。(两千行),实现时间轮盘与事件处理链、常用网络编程模式、同步原语等。它分为三大块,事件驱动、网络 HOOK 和协程机制。

早期是多进程为主,当年切多线程的时候,也遇到一大波修改,后来线程里有了一个线程变量就好多了。如果没有这个东西,你可能要把许多变量改成参数再一层一层传递下去。有了线程变量就好多了。现在我们的协程变量也是这个意义,效果就像写一个宏一样。

另一个是,我们支持 CGI,早期库在 CGI 上遇到问题,所以没有推广。因为一个标准 CGI 服务是基于一些古老的接口的,像 getENV、setENV,就是说你的 coreString 是通过 ENV 来得到的,那么这个我们也把它给 HOOK 掉了,它会根据你的协程去分派。

最难的一个是 gethostbyname 方法,我发现很多人就连在异步编程里,处理 hostbyname 也可能是用了一套独立线程去做,或者你很辛苦地把整个代码抠出来重新写一遍,这个肯定是有很多问题的。所以我们 libco 就把这个 gethostbyname 给完整地支持了。

最后如果你还不爽,说一般业务逻辑可以这么干,那我还有很多后台代码怎么办呢?很多有经验的老的程序员可能要拿着他们那一堆很复杂的异步编程的代码来质疑我们,他们不认为他们的代码已经完全可以被协程所取代了。

他们有如下两个质疑:

  • 质疑性能:协程有很多切换,会不会带来更大开销?
  • 你可能处理几万并发就好,消耗个 1G 内存就行,但是我们这里是处理千万并发哦,这么大的规模,我不信任你这个东西。

这样我们其实是面临了一个问题,因为一些老代码,越是高级的人写的,它的技术栈越深,稍微改动一点代码,就出 BUG 了。

所以我们后来做了两个东西,一个是实际修改了相对简单的异步代码到 libco 里,然后性能更好了。因为在做异步编程的时候,你需要自己去维护很多的数据结构,做你的状态保存,它们的生存期有可能需要很久,你自然地会分配许多内存给它,当然你会用一些内存池去优化它,但是这些是有限的。

但是你用协程的话,很多变量就自然在一个连续的内存里了,相当于一个小的内存池,就比如 if……else……这个你没有必要去 new 一个东西保存状态的,直接放在栈里就行了,所以它的性能更好了。

第二个是,它要求很高的并发。由于协程要一个栈,我们一般开 128k,如果你对这个代码掌控得比较好,可能开 16k,就算是这样,你要开 1 万个协程,还是要 100 多 M 的内存。所以我们后来就在这基础上做了一个可以支持千万连接的协程模式。

Libco 是一个底层库,让你很方便开发,但是大部分开发人员不是直接面对 libco 的,我们花了一年时间把整个微信后台绝大部分逻辑服务、存储服务改成基于 libco,整个配置就直接通过配一台机器上的并发数配 10 倍甚至 20、30 倍,这样子就一下子把整个问题解决了。

过载保护

并发数上去后容易引发另一个问题,早期的时候,后端服务性能高,逻辑服务性能相对弱,很容易被 hold,不可能给后端发起很多连接,不具有“攻击性”,但修改完成后,整个前端变得很强,那可能对后端产生很大的影响。这个时候就要来考虑一下过载保护了。

一般会提到几个点。

1.轻重分离:

就是一个服务里边不要又有重的操作,又有轻的,这样过载的时候,大量的请求都被某些小请求拦截掉了,资源被占满了。

2.队列:

过载保护一般是说系统内部服务在做过去的事情,做无用功。它们可能待在某个队列里边,比如服务时间要求 100ms,但它们总是在做 1s 以前的任务,所以整个系统会崩溃。所以老的架构师会注重说配好每一个服务的队列长度,估算好。但是在繁忙的开发中,是很难去控制的。

3.组合命令式:

后端服务并不是只有一个,上边这个图中的例子,想要调用很多服务,然后 AB 都过载,它们每一个其实都只是过载一点,通过率可达到 80%,但是前端需要这两个服务的组合服务,那么这里就可能只能达到 60% 的通过率。然后后边如果是更多的服务,那么每个服务的一点点过载,到了前端就是很严重的问题。怎么解决呢?

这本书在 12、13 年的时候很火,里边提到了两个对我们有用的点。

  • 一个是“希望系统是分布式的,去中心化”,指系统过载保护依赖每一个节点自身的情况去做,而不是下达一个统一的中心指令。
  • 二是“希望整个控制是基于反馈的”,它举了一些例子,像抽水马桶,像过去炼钢铁的参数很难配,但是只要有一个反馈机制就好解决了。

于是我们构建了一套看起来有点复杂的过载保护系统。

整个系统基于反馈,然后它把整个拒绝的信息全程传递了。看到最右边,有几个典型的服务,从一个 CGI 调用一个后台服务,再调用另一个后台服务,它会在 CGI 层面就把它的重要程度往下传。回到刚才那个前端调用 A、B 服务的例子,使用这样的一种重要程度传递,就可以直接拒绝那些相同用户的 20% 的请求,这样就解决了这个问题。

 怎么配队列?

这个只是反映了生产者和服务者处理能力的差异,观察这个差异,就可以得到一个好的拒绝的数。你不需要去配它多长,只需要去看一个请求在队列里待的平均时间是否可以接受,是一个上涨趋势还是一个下降趋势。这样我们就可以决定要不要去拒绝。那这样几乎是全自动的。你只要配得相对大一点就行了,可以抗一些抖动。在接入之前就评估它,在过去一段时间内平均队列耗时多长,如果超过预支,我们就往下调。这样就把整个系统的过载能力提升了很多。

这是一个具体的做法,我们会考虑两个维度,一个是后台服务,可能服务很多不同的前端,它可能来源于一个支付的请求,经过层层调用,到达后台;或者是一个发消息的服务;它也可能是一个不重要的小服务,如果这个账户服务过载的时候,那么我们可以根据这个表来自动地优先去拒绝一些不那么重要的服务请求,使得我们核心服务能力可以更好地提供。这样整个系统就可以做到很好的过载保护。

数据存储

上边提到一个数据层,那我们是怎么去做数据的呢?

在过去很多年里,我们可能是尽可能去事务化、不追求强一致,一般是采用主备同步的方法。但我们的目标还是强一致的存储。

强一致是说,写一个数据之后,服务器的返回成功不会因为单机故障而丢失。早年我们用的是自己设计的协议,严格来证明的话,没有 Paxos 这么严谨,所以我们在过去一年多的时间内,重新做了一个 Paxos 存储。

它是一个同步复制的数据存储,支持各个园区之间的数据一致性,并且是可以多组多写的,就是说任何一个园区接入,它都可以进行数据的强制读写。另外它并不只是 key-value 模式,它支持 key-value、list、表。在微信这边很少会说完全依赖 key-value 的,因为很多业务都是有列表、表格等的请求,所以很多年前就开始用表格的存储。

Paxos 可用性很高,所以我们就敢做单表有亿行的设计,这样像公众号粉丝等需要很大的,几千万甚至几亿行的记录,就不用考虑自己去分表。并且这个存储可以使用类 SQL 的语句去做,它是完全保证事务的。

它还是插件化系统,不仅支持 LSM,还支持其它存储引擎。

然后它低成本,后台 CPU 有 E3-1230V3,也有 E5-2670 型号的,内存,CPU 与 ssd 之间有一些能力用不上,所以我们系统是可以灵活组合很多不同存储介质的。

这个系统是跑在同城的,也就是上海内部、深圳内部、加拿大内部和香港内部。它们之间的延迟相对较低,几毫秒的级别。这是一个非租约的,没有 leader,不存在切换的不可用期,随时都可以切换任何一个园区。负载均衡这一块我们沿用 kb64 架构,6 台机为一组。因为园区故障少,平时单机时,分摊 25% 的流量,整体比较稳定。6 台为一组时,整个作为一个 set,有很多 set 之间的适用一致性要去做,会有一个很细粒度的伸缩性,比如它可以 100 组扩展到 101 组。

 为什么用这么重的方式呢?

因为希望应用是 简单快速 的,不用假设一个数据写完之后还可能被回退掉,这样只会有很多额外的开销,会有很多问题。比如公众号,他们有很多素材库之类的很重要的存储,如果数据突然丢了,或者说回退了,没有了,那用户投诉是会很严重的。微信账号这边也是这样,如果一个账户注册了,但是这个数据回退了,那也是很严重的问题。

另一个原因是 可用性。在一个传统的主备系统里面,当主机挂掉,面临切不切备机的抉择,然后你会层层请示,说明目前的同步状况,甚至你不知道当前的同步状况,经过很多流程来请示是否切换备机。

而另外,它也不是一个高成本的方案。

 为什么不用 Raft 呢?

Raft 的开源很有价值,它把互联网后台的数据一致性能力提升了很多,就算是一个很小的团队,它也能直接用 Raft 获得一个强一致能力,而这可能就已经超过了许多互联网后台的强一致能力,因为很多后台都是用了很古老的架构,比如长期用到主机架构。

 Raft 与 Paxos 的区别是什么呢?

其实 Raft 和 Paxos 不是一个层面的概念,这个图就是典型的通过一个 log 变更 db 的架构,通过三条 log 一致性做到数据持久强一致性。那 Paxos 在哪里?在一个 log 的某一个 entry 那边,三个点构成一个常量。

那 Raft 是什么呢?它是整一个二维的东西,就是说,基于一个 Paxos 强一致协议做的一条 log,它整个就是一个 Raft。所以我们可以认为 Raft 其实是 Paxos(log)的一种选择。如果你允许绿色部分不存在,那它就不是 Raft。因为 Raft 的设计是你自己做的,它与 Paxos 没关系。

整个 PaxosStore 架构如图:

它包含了很多层,包括缓存和汇聚层、同步复制的组件等。

这一套方案是在线上用了好几千台的,是一个非租约的方案。存储引擎可以自由定制。如果想用大表,那可以用 leveldb。如果想用更强的 LSM,也可以选择。然后我们也有很多 Bitcask 的模型,更适合于内存的 key-value。

由于有几万台机,所以变很重要,我们也基于 BT 做了一套存储方案。它会以园区为根据地,通常一个变更,会以 BT 协议发送到每个园区里,然后园区内部把同机架机器分成一个分组,然后分组内再互传。就我了解,Facebook 和 Twitter、Ebay 都是这样做的。

作者介绍

许家滔,2005 年加入腾讯,见证 QQ 邮箱从百万到数亿用户的整个敏捷开发过程以及架构变迁。2011 年起负责微信后台基础架构,包括分布式存储平台和后台服务框架等,覆盖微信账号 / 消息 / 朋友圈核心存储等,并为公众号 / 微信支付 / 微信企业号等等业务提供组件支持,近两年专注于后台服务质量提升和高性能架构,在数千台机器上面构建了海量高并发 Paxos 存储系统,同时是开源软件 Tencent/libco 负责人。

Overload control for scaling WeChat microservices

Overload control for scaling WeChat microservices Zhou et al., SoCC’18

There are two reasons to love this paper. First off, we get some insights into the backend that powers WeChat; and secondly the authors share the design of the battle hardened overload control system DAGOR that has been in production at WeChat for five years. This system has been specifically designed to take into account the peculiarities of microservice architectures. If you’re looking to put a strategy in place for your own microservices, you could do a lot worse than start here.

WeChat

The WeChat backend at this point consists of over 3000 mobile services, including instant messaging, social networking, mobile payment, and third-party authorization. The platform sees between 10^{10} - 10^{11} external requests per day. Each such request can triggers many more internal microservice requests, such that the WeChat backend as a whole needs to handle hundreds of millions of requests per second.

WeChat’s microservice system accommodates more than 3000 services running on over 20,000 machines in the WeChat business system, and these numbers keep increasing as WeChat is becoming immensely popular… As WeChat is ever actively evolving, its microservice system has been undergoing fast iteration of service updates. For instance, from March to May in 2018, WeChat’s microservice system experienced almost a thousand changes per day on average.

WeChat classify their microservices as “Entry leap” services (front-end services receiving external requests), “Shared leap” services (middle-tier orchestration services), and “Basic services” (services that don’t fan out to any other services, and thus act as sinks for requests).

On a typical day, peak request rate is about 3x the daily average. At certain times of year (e.g. around the Chinese Lunar New Year) peak workload can rise up to 10x the daily average.

Challenges of overload control for large-scale microservice-based platforms

Overload control… is essential for large-scale online applications that need to enforce 24×7 service availability despite any unpredictable load surge.

Traditional overload control mechanisms were designed for a world with a small number of service components, a relatively narrow ‘front-door,’ and trivial dependencies.

… modern online services are becoming increasingly complex in their architecture and dependencies, far beyond what traditional overload control was designed for.

  • With no single entry point for service requests sent to the WeChat backend, the conventional approach of centralized load monitoring at a global entry point (gateway) is not applicable.
  • The service invocation graph for a particular request may depend on request-specific data and service parameters, even for requests of the same type. So when a particular service becomes overload it is very difficult to determine what types of requests should be dropped to mitigate the situation.
  • Excessive request aborts (especially when deeper in the call graph or later in the request processing) waste computational resources and affect user experience due to high latency.
  • Since the service DAG is extremely complex and continuously evolving, the maintenance cost and system overhead for effective cross-service coordination is too high.

Since one service may make multiple requests to a service it depends on, and may also make requests to multiple backend services, we have to take extra care with overload controls. The authors coin the term subsequent overload for the cases where more than one overloaded service is invoked, or a single overloaded service is invoked multiple times.

Subsequent overload raises challenges for effective overload control. Intuitively, performing load shedding at random when a service becomes overloaded can sustain the system with a saturated throughput. However, subsequent overload may greatly degrade system throughput beyond that intended…

Consider a simple scenario where service A invokes service B twice. If B starts rejecting half of all incoming requests, A’s probability of success drops to 0.25.

DAGOR overview

WeChat’s overload control system is called DAGOR. It aims to provide overload control to all services and thus is designed to be service agnostic. Overload control runs at the granularity of an individual machine, since centralised global coordination is too expensive. However, it does incorporate a lightweight collaborative inter-machine protocol which is needed to handle subsequent overload situations. Finally, DAGOR should sustain the best-effort success rate of a service when load shedding becomes inevitable due to overload. Computational resources (e.g. CPU, I/O) spent on failed service tasks should be minimised.

We have two basic tasks to address: detecting an overload situation, and deciding what to do about it once detected.

Overload detection

For overload detection, DAGOR uses the average waiting time of requests in the pending queue (i.e., queuing time). Queuing time has the advantage of negating the impact of delays lower down in the call-graph (compared to e.g. request processing time). Request processing time can increase even when the local server itself is not overloaded. DAGOR uses window-based monitoring, where a window is one second or 2000 requests, whichever comes first. WeChat clearly run a tight ship:

For overload detection, given the default timeout of each service task being 500ms in WeChat, the threshold of the average request queuing time to indicate server overload is set to 20ms. Such empirical configurations have been applied in the WeChat business system for more than five years with its effectiveness proven by the system robustness with respect to WeChat business activities.

Admission control

Once overload is detected, we have to decide what to do about it. Or to put things more bluntly, which requests we’re going to drop. The first observation is that not all requests are equal:

The operation log of WeChat shows that when WeChat Pay and Instant Messaging experience a similar period of service unavailability, user complaints against the WeChat Pay service are 100x those against the Instant Messaging service.

To deal with this in a service agnostic way, every request is assigned a business priority when it first enters the system. This priority flows with all downstream requests. Business priority for a user request is determined by the type of action requested. Although there are hundreds of entry points, only a few tens have explicit priority, all the others having a default (lower) priority. The priorities are maintained in a replicated hashtable.

When overload control is set to business priority level n, all requests from levels n+1 will be dropped. That’s great for mixed workloads, but suppose we have a flood of Payment requests, all at the same priority (e.g. p). The system will become overloaded, and hence move the overload threshold to p-1, when it will become lightly loaded again. Once light load is detected, the overload threshold is incremented to p again, and once more we are in overload. To stop this flip-flipping when overloaded with requests at the same priority level, we need a level of granularity finer than business priority.

WeChat has a neat solution to this. It adds a second layer of admission control based on user-id.

User priority is dynamically generated by the entry service through a hash function that takes the user ID as an argument. Each entry service changes its hash function every hour. As a consequence, requests from the same user are likely to be assigned to the same user priority within one hour, but different user priorities across hours.

This provides fairness while also giving an individual user a consistent experience across a relatively long period of time. It also helps with the subsequent overload problem since requests from a user assigned high priority are more likely to be honoured all the way through the call graph.

Combining business priority and user priority gives a compound admission level with 128 levels of user priority per business priority level.

With each admission level of business priority having 128 levels of user priority, the resulting number of compound admission levels is in the tens of thousands. Adjustment of the compound admission level is at the granule of user priority.

There’s a nice sidebar on why using session ID instead of user ID doesn’t work: you end up training users to log out and then log back in again when they’re experiencing poor service, and now you have a login storm on top of your original overload problem!

DAGOR maintains a histogram of requests at each server to track the approximate distribution of requests over admission priorities. When overload is detected in a window period, DAGOR moves to the first bucket that will decrease expected load by 5%. With no overload, it moves to the first bucket that will increase expected load by 1%.

A server piggy-backs its current admission level on each response message sent to upstream servers. In this way an upstream server learns the current admission control setting of a downstream service, and can perform local admission control on the request before even sending it.

End-to-end therefore, the DAGOR overload control system looks like this:

Experiments

The best testimony to the design of DAGOR is that it’s been working well in production at WeChat for five years. That doesn’t provide the requisite graphs for an academic paper though, so we also get a set of simulation experiments. The following chart highlights the benefits of overload control based on queuing time rather than response time. The benefits are most pronounced in situations of subsequent overload (chart (b)).

Compared to CoDel and SEDA, DAGOR exhibits a 50% higher request success rate with subsequent overloading when making one subsequent call. The benefits are greater the higher the number of subsequent requests:

Finally, in terms of fairness CoDel can be seen to favour services with smaller fan-out to overloaded services when under stress, whereas DAGOR manifests roughly the same success rate across a variety of requests.

Three lessons for your own systems

Even if you don’t use DAGOR exactly as described, the authors conclude with three valuable lessons to take into consideration:

  • Overload control in a large-scale microservice architecture must be decentralized and autonomous in each service
  • Overload control should take into account a variety of feedback mechanisms (e.g. DAGOR’s collaborative admission control) rather than relying solely on open-loop heuristics
  • Overload control design should be informed by profiling the processing behaviour of your actual workloads.

中文版链接: https://www.tuicool.com/articles/aYJjMvN

from:https://blog.acolyer.org/2018/11/16/overload-control-for-scaling-wechat-microservices/

消息系统在微服务间通讯的数据一致性

微服务是当下的热门话题,今天来聊下微服务中的一个敏感话题:如何保证微服务的数据一致性。谈到分布式事务,就避免不了CAP理论。

 

CAP理论是指对于一个分布式计算系统来说,不可能同时满足以下三点:

1. 一致性(Consistence) (等同于所有节点访问同一份最新的数据副本)

2. 可用性(Availability)(对数据更新具备高可用性)

3. 容忍网络分区(Partition tolerance)(以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。)

根据定理,分布式系统只能满足三项中的两项而不可能满足全部三项。以上关于CAP的理论介绍来自维基百科。同理,如何保证微服务间的数据一致性也一直是一个持续的话题,其实就是 如何在这三者中做一个权衡

就之前此公众号已经有一系列的文章来讨论,关于微服务架构下的事务一致性的话题,包括 BASE理论,两阶段提交,三阶段提交,可靠事件模式,TCC模式,补偿模式等,想进一步了解的话可以参考这里:微服务架构下的数据一致性保证(一),微服务架构下的数据一致性保证(二),微服务架构下的数据一致性保证(三)。今天只是谈一谈其中的一种场景:使用消息系统进行微服务间通讯,如何来保证微服务间的数据一致性。

1. 问题的引出:

微服务架构之数据一致性问题

这里我们先以下面的一个例子来引出问题:以 公有云市场1中的一个部署产品来说,当用户想要部署一个公有云中已有的产品,比如Redis产品,用户会先去公有云市场中找到对应的Redis产品,当用户点击发布时,市场中会进行相应的记录,同时后台有真正负责部署的模块,此处我们叫部署模块。当产品部署成功后,部署模块和市场都会进行最终状态的同步。

1、公有云市场:此处指一个简单的模型,类似阿里云的云镜像市场或亚马逊aws中的镜像市场。在云镜像市场中,用户会选择其中感兴趣的产品比如mysql,然后进行付费,发布,这样省去了用户在自己的云平台环境中手动下载安装包,然后安装,配置,启动等一系列繁琐的过程。在云平台市场中,用户所需要做的只是一些必要的配置,然后点击启动就能完成一个产品的发布。一般都是这样一个先购买镜像后启动实例的过程。

以上都是在理想的情况下进行的,大致流程如下图:

此时,市场和部署模块都是是独立的微服务,当平台用户申请开通产品后,公有云市场会先进行一系列的初始化工作,并向部署模块中发送部署请求,当部署模块部署成功或者失败后,会进行相应的记录,市场也会将状态记录到本地数据库。由于市场和部署都是以微服务形式存在,都有自己的本地事务,此时,我们已经无法通过本地事务的控制来保证操作的原子性了。那么问题就随之而来:

假如市场模块在向部署模块发送完请求之后,市场微服务出现了数据库的连接异常(比如连接数据库的网络异常,数据库漂移等),此时市场会向前端报错,提示部署过程中出错,导致部署失败,但实际上部署模块已经在后台默默的为用户开通了实例。
同样的问题也会出现在,当向部署模块发送完请求后市场微服务出现了宕机等意外情况,市场微服务的数据库中干脆直接没有保存用户的此次开通的请求,但实际上部署模块却已经在这个过程中开通过了产品实例。

如果公有云平台对用户资源的实例限制是5个,即一个用户(比如试用版的用户)最多只能开通5个产品实例,则用户此时在市场中最多只能开4个,因为有一个实例在部署模块成功部署,但是市场模块却并不清楚,此时就出现了数据不一致的严重问题。那么该如何解决此类问题呢?如何解决这类业务前后不一致的问题呢?

2. 引入消息框架,解决数据不一致问题

这里我们采用了 消息通信框架Kafka,通过事件机制来完成相应的需求。

在采用Kafka来完成消息的投递的同时,不可避免地也会面对消息的丢失的意外情况。这里我们先来看一下我们实现的主场景,然后在后面我们会接着探讨,如何在业务层面保证消息的绝对投递和消费。

消息发送方的处理

流程处理如下:

我们来分析一下此种设计如何能够满足我们的需求:

市场模块操作Product和Event是在本地事务进行,保证了本地操作的一致性。

如果开通产品时,市场领域在事件发布之前就发生了异常,宕机或者数据库无法连接,根据设计,事件发布定时器和开通产品的Service是分离操作,此时发生宕机等意外事件,并不会影响数据库中的数据,并 在下次服务器正常后事件发布定时器会去Event表中查找尚未发布的数据进行发布并更新消息状态为PUBLISHED.

如果是在更新库中的状态时发生了意外呢?此时消息已经发出到Kafka broker,则下次服务正常时,会将这些消息重新发送,但是因为有了Key的唯一性,部署模块判断这些是重复数据,直接忽略即可。

当产品部署成功后,Market事件监听器收到通知,准备更新数据库时发生了意外宕机等,下次服务正常启动后事件监听器会从上次的消息偏移量处进行监听并更新Event表。

消息接收方的处理

下面我们来看一下消息的接收方部署模块如何处理从Kafka Broker接收到的消息呢?

以下是部署模块对消息处理的流程图,此处部署模块的部署过程使用了简略的示意图。实际的场景中,部署动作以及更新状态是一个复杂的过程,同时可能依赖轮询来完成操作。

部署模块的事件监听器,在收到通知后,直接调用部署的Service,更新Deploy_table表中的业务逻辑,同时更新Event_Table中的消息状态。另一方面, 部署模块的 Event定时器,也会定时从Event_Table中读取信息并将结果发布到Kafka Broker, 市场模块收到通知后进行自己的业务操作。

至于采用这种模式的原理以及原因和市场领域类似,这里不再赘述。

3.引入补偿+幂等机制,

保证消息投递的可靠性

刚才也谈到,Kafka等市面上的大多数消息系统本身是无法保证消息投递的可靠性的。所以,我们也必须要从业务上对消息的意外情况进行保证。下面,我们探讨一下如何从业务上来保证消息投递的绝对可靠呢?

这里,我们就要引入 补偿机制+幂等操作,我们在前面的步骤中已经将Event进行了数据库持久化,我们还需要以下几个步骤来从业务上对消息的绝对可靠进行保证:

一、完善事件表字段

我们在Event表中增加两个新的字段count和updateTime,用来标识此消息发送或者重试的次数。正常情况下,count为1,表示只发送一次。

二、定时补偿加错误重试

同时 使用异常事件发布定时器,每隔2分钟(此时间只是一个示例,实际应用中应大于业务中正常业务处理逻辑的时间)去Event表中查询状态为PUBLISHED的消息,如果对应的消息记录更新时间为两分钟之前的时间,我们就悲观的认为此消息丢失,进行消息的重发,同时更新字段updateTime并将count计数加1。

三、最后一道防线:对账记录,人工干预

如果发现重发次数已经大于5,则认为此时已经无法依靠消息系统来完成此消息的投递,需要最后的一道保障,就是记录下来并在日常进行的人工对账中人工审核。

 

四、幂等去重

何为幂等呢?因为存在重试和错误补偿机制,不可避免的在系统中存在重复收到消息的场景,接口的幂等性能提高数据的一致性.在编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

由于我们的定时补偿机制,消息的消费端也应该保证部署服务的操作是幂等的,即针对同一条消息多次发送的情况,我们应该保证这个消息实际上只会执行一次。这里如果发现消息是重复发送的,则直接将数据库中的执行结果读出并将结果推送到broker中,从而保证了消息的幂等性。

现在我们来分析一下此种策略如何保证的消息的绝对投递:

每条消息的产生都会在数据库中进行记录,保证消息的不丢失。

异常消息发布定时器会定时去Event表中查看异常消息,发现没有回应的数据则认为消息丢失,进行消息补偿,重新发送,如果连续5次依然失败则认为发生了异常,进行记录并人工干预对账。

对于部署模块(消息的消费端),如果消息丢失,则市场模块就无法收到回应(对应的Event表记录中的状态也不会修改),最终效果也会同#2情况,市场模块进行消息重发,如果重发次数超出了限制则会触发对账记录的业务逻辑。

4. 总结

本文通过采用消息系统进行微服务间的通信,加上一些设计上的变更,既保证了正常情况下(99.9%以上的情况)的逻辑的正确执行,也保证了极端情况下的数据一致性,满足了我们的业务需求,同时依赖市面上消息中间件强大的功能,极大的提高了系统的吞吐量。

针对Kafka等本身不可靠的问题,我们又通过修改业务场景的设计来保证了在极端情况下消息丢失时消息的可靠性,对应的也保证了业务的可靠性。此处只是以Kafka举例,如果是顾虑Kafka的本身消息不可靠的限制,可以考虑使用RabbitMQ或RocketMQ等市面上流行的消息通信框架。

概括来说,此方案主要保证了以下4个维度的一致性:

本地事务保证了业务持久化与消息持久化的一致性。

定时器保证了消息持久与消息投递的一致性。

消息中间件保证了消息的投递和消费的一致性。

业务补偿+幂等保证了消息失败下的一致性。

使用此种方案的弊端就是编码会大幅增加,为不同的微服务间增加不少额外的工作量,同时会产生较多的中间状态。对于业务中时间要求苛刻的场景,此方案不合适。(此处却符合本文中举例的场景,因为产品的开通,需要对容器进行操作,本身就是一个耗时的过程。)

数据一致性是微服务架构设计中唯恐避之不及却又不得不考虑的话题。通过保证最终数据的一致性,也是对CAP理论的一个折衷妥协方案,关于此方案的优劣,也不能简单的一言而概之,而是应该根据场景定夺,适合的才是最好的。

所以,我们在对微服务进行业务划分的时候就尽可能的避免“可能会产生一致性问题”的设计。如果这种设计过多,也许是时候考虑改改设计了。

from:http://windpoplar.iteye.com/blog/2353205

Java中的纤程库 – Quasar

最近遇到的一个问题大概是微服务架构中经常会遇到的一个问题:

服务 A 是我们开发的系统,它的业务需要调用 BCD 等多个服务,这些服务是通过http的访问提供的。 问题是 BCD 这些服务都是第三方提供的,不能保证它们的响应时间,快的话十几毫秒,慢的话甚至1秒多,所以这些服务的Latency比较长。幸运地是这些服务都是集群部署的,容错率和并发支持都比较高,所以不担心它们的并发性能,唯一不爽的就是就是它们的Latency太高了。

简化的微服务架构简化的微服务架构

系统A会从Client接收Request, 每个Request的处理都需要多次调用B、C、D的服务,所以完成一个Request可能需要1到2秒的时间。为了让A能更好地支持并发数,系统中使用线程池处理这些Request。当然这是一个非常简化的模型,实际的业务处理比较复杂。

可以预见,因为系统B、C、D的延迟,导致整个业务处理都很慢,即使使用线程池,但是每个线程还是会阻塞在B、C、D的调用上,导致I/O阻塞了这些线程, CPU利用率相对来说不是那么高。

当然在测试的时候使用的是B、C、D的模拟器,没有预想到它们的响应是那么慢,因此测试数据的结果还不错,吞吐率还可以,但是在实际环境中问题就暴露出来了。

概述

最开始线程池设置的是200,然后用HttpUrlConnection作为http client发送请求到B、C、D。当然HttpUrlConnection也有一些坑,比如Persistent ConnectionsCaveats of HttpURLConnection,跳出坑后性能依然不行。

通过测试,如果B、C、D等服务延迟接近0毫秒,则HttpUrlConnection的吞吐率(线程池的大小为200)能到40000 requests/秒,但是随着第三方服务的响应时间变慢,它的吞吐率急剧下降,B、C、D的服务的延迟为100毫秒的时候,则HttpUrlConnection的吞吐率降到1800 requests/秒,而B、C、D的服务的延迟为100毫秒的时候HttpUrlConnection的吞吐率降到550 requests/秒。

增加http.maxConnections系统属性并不能显著增加吞吐率。

如果增加调用HttpUrlConnection的线程池的大小,比如增加到2000,性能会好一些,但是B、C、D的服务的延迟为500毫秒的时候,吞吐率为3800 requests/秒,延迟为1秒的时候,吞吐率为1900 requests/秒。

虽然线程池的增大能带来性能的提升,但是线程池也不能无限制的增大,因为每个线程都会占用一定的资源,而且随着线程的增多,线程之间的切换也更加的频繁,对CPU等资源也是一种浪费。

切换成netty(channel pool),与B、C、D通讯的性能还不错, latency为500ms的时候吞吐率能达到10000 requests/秒,通讯不成问题,问题是需要将业务代码改成异步的方式,异步地接收到这些response后在一个线程池中处理这些消息。

下面列出了一些常用的http client:

  • JDK’s URLConnection uses traditional thread-blocking I/O.
  • Apache HTTP Client uses traditional thread-blocking I/O with thread-pools.
  • Apache Async HTTP Client uses NIO.
  • Jersey is a ReST client/server framework; the client API can use several HTTP client backends including URLConnection and Apache HTTP Client.
  • OkHttp uses traditional thread-blocking I/O with thread-pools.
  • Retrofit turns your HTTP API into a Java interface and can use several HTTP client backends including Apache HTTP Client.
  • Grizzly is network framework with low-level HTTP support; it was using NIO but it switched to AIO .
  • Netty is a network framework with HTTP support (low-level), multi-transport, includes NIO and native (the latter uses epoll on Linux).
  • Jetty Async HTTP Client uses NIO.
  • Async HTTP Client wraps either Netty, Grizzly or JDK’s HTTP support.
  • clj-http wraps the Apache HTTP Client.
  • http-kit is an async subset of clj-http implemented partially in Java directly on top of NIO.
  • http async client wraps the Async HTTP Client for Java.

这个列表摘自 High-Concurrency HTTP Clients on the JVM,不止于此,这篇文章重点介绍基于java纤程库quasar的实现的http client库,并比较了性能。我们待会再说。

回到我前面所说的系统,如何能更好的提供性能?有一种方案是借助其它语言的优势,比如Go,让Go来代理完成和B、C、D的请求,系统A通过一个TCP连接与Go程序交流。第三方服务B、C、D的Response结果可以异步地返回给系统A。

Go的优势在于可以实现request-per-goroutine,整个系统中可以有成千上万个goroutine。 goroutine是轻量级的,而且在I/O阻塞的时候可以不占用线程,这让Go可以轻松地处理上万个链接,即使I/O阻塞也没问题。Go和Java之间的通讯协议可以通过Protobuffer来实现,而且它们之间只保留一个TCP连接即可。

当然这种架构的修改带来系统稳定性的降低,服务A和服务B、C、D之间的通讯增加了复杂性。同时,因为是异步方式,服务A的业务也要实现异步方式,否则200个线程依然等待Response的话,还是一个阻塞的架构。

通过测试,这种架构可以带来稳定的吞吐率。 不管服务B、C、D的延迟有多久,A的吞吐率能维持15000 requests/秒。当然Go到B、C、D的并发连接数也有限制,我把最大值调高到20000。

这种曲折的方案的最大的两个弊病就是架构的复杂性以及对原有系统需要进行大的重构。 高复杂性带来的是系统的稳定性的降低,包括部署、维护、网络状况、系统资源等。同时系统要改成异步模型,因为系统业务线程发送Request后不能等待Go返回Response,它需要从Client接收更多的Request,而收到Response之后它才继续执行剩下的业务,只有这样才不会阻塞,进而提到系统的吞吐率。

将系统A改成异步,然后使用HttpUrlConnection线程池行不行?
HttpUrlConnection线程池还是导致和B、C、D通讯的吞吐率下降,但是Go这种方案和B、C、D通讯的吞吐率可以维持一个较高的水平。

考虑到Go的优势,那么能不能在Java中使用类似Go的这种goroutine模型呢?那就是本文要介绍的Java纤程库: [Quasar](http://docs.paralleluniverse.co/quasar/)。

实际测试结果表明Go和Netty都是两种比较好的解决方案,而且Netty的性能惊人的好,不好的地方正如前面所讲,我们需要将代码改成异步的处理。线程池中的业务单元用Netty发送完Request之后,不要等待Response, Response的处理交给另外的线程来处理,同时注意不要在Netty的Handler里面处理业务逻辑。要解决的问题就变成如何更高效的处理Response了,而不是第三方系统阻塞的问题。

quasar初步

以下介绍Java的另一个解决方案,也就是Java中的coroutine库,因为最近刚刚看这个库,感觉挺不错的,而且用它替换Thread改动较少。

Java官方并没有纤程库。但是伟大的社区提供了一个优秀的库,它就是Quasar。

创始人是Ron Pressler和Dafna Pressler,由Y Combinator孵化。

Quasar is a library that provides high-performance lightweight threads, Go-like channels, Erlang-like actors, and other asynchronous programming tools for Java and Kotlin.

Quasar提供了高性能轻量级的线程,提供了类似Go的channel,Erlang风格的actor,以及其它的异步编程的工具,可以用在Java和Kotlin编程语言中。Scala目前的支持还不完善,我想如果这个公司能快速的发展壮大,或者被一些大公司收购的话,对Scala的支持才能提上日程。

你需要把下面的包加入到你的依赖中:

  • Core (必须) co.paralleluniverse:quasar-core:0.7.5[:jdk8] (对于 JDK 8,需要增加jdk8 classifier)
  • Actor co.paralleluniverse:quasar-actors:0.7.5
  • Clustering co.paralleluniverse:quasar-galaxy:0.7.5
  • Reactive Stream co.paralleluniverse:quasar-reactive-streams:0.7.5
  • Kotlin co.paralleluniverse:quasar-kotlin:0.7.5

Quasar fiber依赖java instrumentation修改你的代码,可以在运行时通过java Agent实现,也可以在编译时使用ant task实现。

通过java agent很简单,在程序启动的时候将下面的指令加入到命令行:

1
-javaagent:path-to-quasar-jar.jar

对于maven来说,你可以使用插件maven-dependency-plugin,它会为你的每个依赖设置一个属性,以便在其它地方引用,我们主要想使用 ${co.paralleluniverse:quasar-core:jar}:

1
2
3
4
5
6
7
8
9
10
11
12
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>

然后你可以配置exec-maven-plugin或者maven-surefire-plugin加上agent参数,在执行maven任务的时候久可以使用Quasar了。

官方提供了一个Quasar Maven archetype,你可以通过下面的命令生成一个quasar应用原型:

1
2
3
4
5
6
7
8
git clone https://github.com/puniverse/quasar-mvn-archetype
cd quasar-mvn-archetype
mvn install
cd ..
mvn archetype:generate -DarchetypeGroupId=co.paralleluniverse -DarchetypeArtifactId=quasar-mvn-archetype -DarchetypeVersion=0.7.4 -DgroupId=testgrp -DartifactId=testprj
cd testprj
mvn test
mvn clean compile dependency:properties exec:exec

如果你使用gradle,可以看一下gradle项目模版:Quasar Gradle template project

最容易使用Quasar的方案就是使用Java Agent,它可以在运行时instrument程序。如果你想编译的时候就使用AOT instrumentation(Ahead-of-Time),可以使用Ant任务co.paralleluniverse.fibers.instrument.InstrumentationTask,它包含在quasar-core.jar中。

Quasar最主要的贡献就是提供了轻量级线程的实现,叫做fiber(纤程)。Fiber的功能和使用类似Thread, API接口也类似,所以使用起来没有违和感,但是它们不是被操作系统管理的,它们是由一个或者多个ForkJoinPool调度。一个idle fiber只占用400K内存,切换的时候占用更少的CPU,你的应用中可以有上百万的fiber,显然Thread做不到这一点。这一点和Go的goroutine类似。

Fiber并不意味着它可以在所有的场景中都可以替换Thread。当fiber的代码经常会被等待其它fiber阻塞的时候,就应该使用fiber。
对于那些需要CPU长时间计算的代码,很少遇到阻塞的时候,就应该首选thread

以上两条是选择fiber还是thread的判断条件,主要还是看任务是I/O blocking相关还是CPU相关。幸运地是,fiber API使用和thread使用类似,所以代码略微修改久就可以兼容。

Fiber特别适合替换哪些异步回调的代码。使用FiberAsync异步回调很简单,而且性能很好,扩展性也更高。

类似Thread, fiber也是用Fiber类表示:

1
2
3
4
5
6
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();

与Thread类似,但也有些不同。Fiber可以有一个返回值,类型为泛型V,也可以为空Void。run也可以抛出异常InterruptedException

你可以传递SuspendableRunnableSuspendableCallable 给Fiber的构造函数:

1
2
3
4
5
new Fiber<Void>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();

甚至你可以调用Fiber的join方法等待它完成,调用get方法得到它的结果。

Fiber继承Strand类。Strand类代表一个Fiber或者Thread,提供了一些底层的方法。

逃逸的Fiber(Runaway Fiber)是指那些陷入循环而没有block、或者block fiber本身运行的线程的Fiber。偶尔有逃逸的fiber没有问题,但是太频繁会导致性能的下降,因为需要调度器的线程可能都忙于逃逸fiber了。Quasar会监控这些逃逸fiber,你可以通过JMX监控。如果你不想监控,可以设置系统属性co.paralleluniverse.fibers.detectRunawayFibersfalse

fiber中的ThreadLocal是fiber local的。InheritableThreadLocal继承父fiber的值。

Fiber、SuspendableRunnable 、SuspendableCallable 的run方法会抛出SuspendExecution异常。但这并不是真正意义的异常,而是fiber内部工作的机制,通过这个异常暂停因block而需要暂停的fiber。

任何在Fiber中运行的方法,需要声明这个异常(或者标记@Suspendable),都被称为suspendable method。

反射调用通常都被认为是suspendable, Java8 lambda 也被认为是suspendable。不应该将类构造函数或类初始化器标记为suspendable。

synchronized语句块或者方法会阻塞操作系统线程,所以它们不应该标记为suspendable。Blocking线程调用默认也不被quasar允许。但是这两种情况都可以被quasar处理,你需要在Quasar javaagent中分别加上mb参数,或者ant任务中加上allowMonitorsallowBlocking属性。

quasar原理

Quasar最初fork自Continuations Library

如果你了解其它语言的coroutine, 比如Lua,你久比较容易理解quasar的fiber了。 Fiber实质上是 continuation, continuation可以捕获一个计算的状态,可以暂停当前的计算,等隔一段时间可以继续执行。Quasar通过instrument修改suspendable方法。Quasar的调度器使用ForkJoinPool调度这些fiber。

Fiber调度器FiberScheduler是一个高效的、work-stealing、多线程的调度器。

默认的调度器是FiberForkJoinScheduler,但是你可以使用自己的线程池去调度,请参考FiberExecutorScheduler

当一个类被加载时,Quasar的instrumentation模块 (使用 Java agent时) 搜索suspendable 方法。每一个suspendable 方法 f通过下面的方式 instrument:
它搜索对其它suspendable方法的调用。对suspendable方法g的调用,一些代码会在这个调用g的前后被插入,它们会保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,我们会发现对Fiber.park的调用。park暂停这个fiber,扔出 SuspendExecution异常。

g block的时候,SuspendExecution异常会被Fiber捕获。 当Fiber被唤醒(使用unpark), 方法f会被调用, 执行记录显示它被block在g的调用上,所以程序会立即跳到f调用g的那一行,然后调用它。最终我们会到达暂停点,然后继续执行。当g返回时, f中插入的代码会恢复f的本地变量。

过程听起来很复杂,但是它只会带来3% ~ 5%的性能的损失。

下面看一个简单的例子, 方法m2声明抛出SuspendExecution异常,方法m1调用m2和m3,所以也声明抛出这个异常,最后这个异常会被Fiber所捕获:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = “m1”;
System.out.println(“m1 begin”);
m = m2();
m = m3();
System.out.println(“m1 end”);
System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return “m2”;
}
static String m3() throws SuspendExecution, InterruptedException {
return “m3”;
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber<Void>(“Caller”, new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
m1();
}
}).start();
}
}

反编译这段代码 (一般的反编译软件如jd-gui不能把这段代码反编译java文件,Procyon虽然能反编译,但是感觉反编译有错。所以我们还是看字节码吧):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}

这段反编译的代码显示了方法m被instrument后的样子,虽然我们不能很清楚的看到代码执行的样子,但是也可以大概地看到它实际在方法的最开始加入了此方法的栈信息的检查(#0 ~ #49,如果是第一次运行这个方法,则直接运行,
然后在一些暂停点上加上一些栈压入的处理,并且可以在下次执行的时候直接跳到上次的暂停点上。

官方的工程师关于Quasar的instrument操作如下:

  • Fully analyze the bytecode to find all the calls into suspendable methods. A method that (potentially) calls into other suspendable methods is itself considered suspendable, transitively.
  • Inject minimal bytecode in suspendable methods (and only them) that will manage an user-mode stack, in the following places:
    • At the beginning we’ll check if we’re resuming the fiber and only in this case we’ll jump into the relevant bytecode index.
    • Before a call into another suspendable method we’ll push a snapshot of the current activation frame, including the resume bytecode index; we can do it because we know the structure statically from the analysis phase.
    • After a call into another suspendable method we’ll pop the top activation frame and, if resumed, we’ll restore it in the current fiber.

我并没有更深入的去了解Quasar的实现细节以及调度算法,有兴趣的读者可以翻翻它的代码。如果你有更深入的剖析,请留下相关的地址,以便我加到参考文档中。

曾经, 陆陆续续也有一些Java coroutine的实现(coroutine-libraries), 但是目前来说最好的应该还是Quasar。

Oracle会实现一个官方的纤程库吗?目前来说没有看到这方面的计划,而且从Java的开发进度上来看,这个特性可能是遥遥无期的,所以目前还只能借助社区的力量,从第三方库如Quasar中寻找解决方案。

更多的Quasar知识,比如Channel、Actor、Reactive Stream 的使用可以参考官方的文档,官方也提供了多个例子

Comsat介绍

Comsat又是什么?

Comsat还是Parallel Universe提供的集成Quasar的一套开源库,可以提供web或者企业级的技术,如HTTP服务和数据库访问。

Comsat并不是一套web框架。它并不提供新的API,只是为现有的技术如Servlet、JAX-RS、JDBC等提供Quasar fiber的集成。

它包含非常多的库,比如Spring、ApacheHttpClient、OkHttp、Undertow、Netty、Kafka等。

性能对比

刘小溪在CSDN上写了一篇关于Quasar的文章:次时代Java编程(一):Java里的协程,写的挺好,建议读者读一读。

它参考Skynet的测试写了代码进行对比,这个测试是并发执行整数的累加:
测试结果是Golang花了261毫秒,Quasar花了612毫秒。其实结果还不错,但是文中指出这个测试没有发挥Quasar的性能。因为quasar的性能主要在于阻塞代码的调度上。
虽然文中加入了排序的功能,显示Java要比Golang要好,但是我觉得这又陷入了另外一种错误的比较, Java的排序算法使用TimSort,排序效果相当好,Go的排序效果显然比不上Java的实现,所以最后的测试主要测试排序算法上。 真正要体现Quasar的性能还是测试在有阻塞的情况下fiber的调度性能。

HttpClient

话题扯的越来越远了,拉回来。我最初的目的是要解决的是在第三方服务响应慢的情况下提高系统 A 的吞吐率。最初A是使用200个线程处理业务逻辑,调用第三方服务。因为线程总是被第三方服务阻塞,所以系统A的吞吐率总是很低。

虽然使用Go可以解决这个问题,但是对于系统A的改造比较大,还增加了系统的复杂性。Netty性能好,改动量还可以接受,但是不妨看一下这个场景,系统的问题是由http阻塞引起。

这正是Quasar fiber适合的场景,如果一个Fiber被阻塞,它可以暂时放弃线程,以便线程可以用来执行其它的Fiber。虽然整个集成系统的吞吐率依然很低,这是无法避免的,但是系统的吞吐率确很高。

Comsat提供了Apache Http Client的实现: FiberHttpClientBuilder

1
2
3
4
final CloseableHttpClient client = FiberHttpClientBuilder.
create(2). // use 2 io threads
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).build();

然后在Fiber中久可以调用:

1
String response = client.execute(new HttpGet(“http://localhost:8080”), BASIC_RESPONSE_HANDLER);

你也可以使用异步的HttpClient:

1
2
3
4
5
6
final CloseableHttpAsyncClient client = FiberCloseableHttpAsyncClient.wrap(HttpAsyncClients.
custom().
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).
build());
client.start();

Comsat还提供了Jersey Http Client: AsyncClientBuilder.newClient()

甚至提供了RetrofitOkHttp的实现。

经过测试,虽然随着系统B、C、D的响应时间的拉长,吞吐率有所降低,但是在latency为100毫秒的时候吞吐率依然能达到9900 requests/秒,可以满足我们的需求,而我们的代码改动的比较小。

综上所述,如果想彻底改造系统A,则可以使用Go库重写,或者使用Netty + Rx的方式去处理,都能达到比较好的效果。如果想改动比较小,可以考虑使用quasar替换线程对代码进行维护。

我希望本文不要给读者造成误解,以为Java NIO/Selector这种方式不能解决本文的问题,也就是第三方阻塞的问题。 事实上Java NIO也正是适合解决这样的问题, 比如Netty性能就不错,但是你需要小心的是, 不要让你的这个client对外又变成阻塞的方式,而是程序应该异步的去发送request和处理response。当然本文重点不是介绍这种实现,而是介绍Java的线程库,它可以改造传统的代码,即使有阻塞,也只是阻塞Fiber,而不是阻塞线程,这是另一个解决问题的思路。

另一篇关于Quasar的文档: 继续了解Java的纤程库 – Quasar

参考文档

from:http://colobu.com/2016/07/14/Java-Fiber-Quasar/