正文
我们一开始就认为单个 ES 集群搞不定海量 repo ,因为大家都知道 ES 在 Master 被选举之前是一个 P2P 的系统,但是当 Master 被选取后,它的管理本质上是 Master 和 slave 的模式。所以随着集群规模的增大,以及 repo 数量的增多,整个集群管理负担越来越大,对于 Master 要求会越来越高,最终会导致集群挂掉。
因为我们团队都是比较有经验的 ES 工程师,所以我们一开始就提出
产品必须是多集群的海量 Cluster 的模型
。如果 Cluster 出现资源不够的情况,我们从运维的角度加一些新的 node 即可。另外如果某一个 Cluster 里面 repo 达到了一定数量,超过了 ES 本身系统资源的瓶颈,我们就会选择增加一个新的 Cluster 来搞定这个事情。
图 7
当我们敲定了多租户模型之后很快迭代出了第一版系统,第一版系统架构图如图 7 所示,最上面一层会有 Portal、logical、SDK 等等。右边是 export,我们的数据从 export 过来,所以要跟 export 对接。
在 API Server 这层我们做了一些服务的解藕,首先是一个 search 集群,一个索引集群,同时其他类型业务会被集中放在一个集群。我们的数据存在 Mongo 里面,下面是整个 ES 集群的集合,最左边需要有一个管理的服务,对 ES 集群进行维护,比如对 ES 进行优化等。
我们第一版系统上线之后遇到什么问题?
首先,因为打点的 QPS 非常高,导致我们的 Mongo 成为系统里最先达到瓶颈的一环,它的查询压力非常大。那么,怎么解决这个问题?其实,我们需要一套 Cache 系统,这样就可以把大量请求拦在 Mongo 之前来解决问题。不过还好,因为我们已经有了一套缓存系统,所以不需要花太长时间来解决这个问题。
七牛的缓存系统是一个二级缓存系统,第一级指的是 Local Cache,第二级是指 Memory Cache。举个例子,当一个请求发出来,如果 Local Cache 没有命中,它会去 Memory Cache 找,如果 Memory Cache 也没有找到,这时候它会通过 qconf Master 到 Mongo 里找,如果 Mongo 找到之后,会把这个值回写到 Memory Cache 和 Local Cache ,那么第二次请求时直接在 Local Cache 会被拦掉。如果这个请求很不幸命中了另外一个API Server,实际上 Local Cache 是没有的,但是 Memory Cache 也是有的,所以直接去 Memory Cashe 就可以拿到值了,然后再回写 Local Cache。
通过计算,我们发现,基本上 99% 的请求都会被整个 Cache 系统拦住,而这个系统里基本上有 80% 的请求会被 Local Cache 拦住,也就是说整个 Memory Cache 和 Mongo 的压力都是非常小的。所以我们马上迭代出了新一版的系统,即我们加入了一个 Qconf Cluster。
这时候我们遇到的新问题是什么?第一个问题是 LAG,经常有用户反映这个系统查不到最近 10 分钟或者 1 小时的数据。那么,LAG 是怎么产生的?我们认为整个写点代码是有问题的,写点的整个效率有问题。所以,我们要对写点做一些优化,在开始优化前,我们首先要了解 Benchmark,对整个效率有一个正确的认识。首先,我们要正确知道 ES 真正的吞吐量在哪里,因为我们整体资源是够的。比如,我们有十台机器,但是为什么我们搞不定这些量,对用户产生 LAG 了呢?所以我们先对 ES 集群做一个 Benchmark。
我们拿到一个具体的值,ES 集群做一个 Benchmark 要怎么做?官方说,首先你一个并发控制 batchSize,从 0 加到 1 万,
当你的 batchSize
增加没有任何收益时,固定这个值,再去增加它的并发数。当并发数增加也没有收益的时候,固定的这两个值就代表了 ES 最佳的效率。
那也就是说,其实 ES 打点的吞吐量其实取决于客户端,对于这种推送系统其实大部分都是这样。即我们对客户端程序要求比较高,当然我们的数据如果是直接从内存里面来的,没有问题,我们可以一直
维持这个并发数
。但事实上我们的数据时 export 打给我们的,它发的请求可能是零散的,可能一个 batch 有可能一个只有一条数据或者一万条数据,这种零散的数据打到 ES 之后,整个吞吐量会下滑。
所以,我们其实在 ES 之前需要做一个数据传输系统,这套系统要搞定 export 打给我们数据零散的点,我们要把它包装好,以 ES 最佳的效率打给 ES,同时这套系统应该是多租户的,我们内部称这个系统叫 Producer。那么,这个系统应该怎么做?首先,我们对数据传输做一个简单的思考,很多人经常会说,你做一个下游系统,对我的服务应该是稳定的,不管我以什么样的姿势跟你交互。或者说传输的下游消费速度也是稳定的,很多时候我们也会认为,这个链路整体传输速度其实取决于上游和下游的影响。但事实上并非如此,对于上游或者下游的速度来讲,吞吐量核心取决于并发数和并发大小,而维持最大吞吐量的核心是要维持并发数和并发大小。另外,要维持整体吞吐量要考虑三方面因素:拉取效率、链路效率和吞吐效率。
举个例子,我们要搞定从 Kafka 到 ES 每秒 10K 的流量。我们知道 Kafka 效率很高,可能三个 Patation 可以搞定这个事情。那么我们需要的并发是 3。它的整个batchSize 是 10K,这对 Kafka 是最友好的。但是对于 ES 来说,你要搞定每秒 10K 的流量,其实他的姿势应该是这样的,可能你的并发数是5,你的 batchSize 是 20K。
对于一个正常的系统,我们的 batchSize 该如何调?肯定是以最慢的为准,调到 20×5 的并发数,但事实上即便这样也解决不了。因为链路本身也有效率的损耗,比如你数据是 JSON 格式,它首先要做 unMarshal,这个时间其实是 CPU 的开销。所以真正要搞定这个问题,你可能需要大概 20K×8 的姿势才能搞定这个问题。那么,如何解决这个问题?我们认为我们需要引进一个东西来对上游下游做解藕,我们称之为 Memory queue。
图 8
所以,我们首先需要一个队列,而且是内存队列,因为我们知道数据从上游拉下来之后进入内存队列,它的效率是最高的。进入内存队列之后,我们需要一个 Source 和 sink,因为我们要动态调整上游的 Source 数量和它的 batchSize 、下游的 sink 数量和它的 batchSize。与此同时,我们要做一个事务,为什么呢?用 ES 的人都知道,当我们通过bulk接口打点的时候,一个请求里面有 2w 个点,ES 其实是非常不友好的,他会告诉你这 2 万个点里有一半失败一半成功。其实这对客户端的负担很重。所以我们希望这套系统带事务,这样对于客户端来说,你打给我的点可以保证要么都成功要么都失败。