字节跳动 Go RPC 框架 KiteX 性能优化实践

本文选自“字节跳动基础架构实践”系列文章。

“字节跳动基础架构实践”系列文章是由字节跳动基础架构部门各技术团队及专家倾力打造的技术干货内容,和大家分享团队在基础架构发展和演进过程中的实践经验与教训,与各位技术同学一起交流成长。

KiteX 自 2020.04 正式发布以来,公司内部服务数量 8k+,QPS 过亿。经过持续迭代,KiteX 在吞吐和延迟表现上都取得了显著收益。本文将简单分享一些较有成效的优化方向,希望为大家提供参考。

前言

KiteX 是字节跳动框架组研发的下一代高性能、强可扩展性的 Go RPC 框架。除具备丰富的服务治理特性外,相比其他框架还有以下特点:集成了自研的网络库 Netpoll;支持多消息协议(Thrift、Protobuf)和多交互方式(Ping-Pong、Oneway、 Streaming);提供了更加灵活可扩展的代码生成器。

目前公司内主要业务线都已经大范围使用 KiteX,据统计当前接入服务数量多达 8k。KiteX 推出后,我们一直在不断地优化性能,本文将分享我们在 Netpoll 和 序列化方面的优化工作。

自研网络库 Netpoll 优化

自研的基于 epoll 的网络库 —— Netpoll,在性能方面有了较为显著的优化。测试数据表明,当前版本(2020.12) 相比于上次分享时(2020.05),吞吐能力 ↑30%,延迟 AVG ↓25%,TP99 ↓67%,性能已远超官方 net 库。以下,我们将分享两点显著提升性能的方案。

epoll_wait 调度延迟优化

Netpoll 在刚发布时,遇到了延迟 AVG 较低,但 TP99 较高的问题。经过认真研究 epoll_wait,我们发现结合 polling 和 event trigger 两种模式,并优化调度策略,可以显著降低延迟。

首先我们来看 Go 官方提供的 syscall.EpollWait 方法:

func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error)

这里共提供 3 个参数,分别表示 epoll 的 fd、回调事件、等待时间,其中只有 msec 是动态可调的。

通常情况下,我们主动调用 EpollWait 都会设置 msec=-1,即无限等待事件到来。事实上不少开源网络库也是这么做的。但是我们研究发现,msec=-1 并不是最优解。

epoll_wait 内核源码(如下) 表明,msec=-1 比 msec=0 增加了 fetch_events 检查,因此耗时更长。

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                   int maxevents, long timeout)
{
    ...
    if (timeout > 0) {
       ...
    } else if (timeout == 0) {
        ...
        goto send_events;
    }

fetch_events:

if (eavail)
goto send_events;

send_events:

Benchmark 表明,在有事件触发的情况下,msec=0 比 msec=-1 调用要快 18% 左右,因此在频繁事件触发场景下,使用 msec=0 调用明显是更优的。

字节跳动 Go RPC 框架 KiteX 性能优化实践

而在无事件触发的场景下,使用 msec=0 显然会造成无限轮询,空耗大量资源。

综合考虑后,我们更希望在有事件触发时,使用 msec=0 调用,而在无事件时,使用 msec=-1 来减少轮询开销。伪代码如下:

var msec = -1
for {
   n, err = syscall.EpollWait(epfd, events, msec)
   if n <= 0 {
      msec = -1
      continue
   }
   msec = 0
   ...
}

那么这样就可以了吗?事实证明优化效果并不明显。

我们再做思考:

msec=0 仅单次调用耗时减少 50ns,影响太小,如果想要进一步优化,必须要在调度逻辑上做出调整。

进一步思考:

上述伪代码中,当无事件触发,调整 msec=-1 时,直接 continue 会立即再次执行 EpollWait,而由于无事件,msec=-1,当前 goroutine 会 block 并被 P 切换。但是被动切换效率较低,如果我们在 continue 前主动为 P 切换 goroutine,则可以节约时间。因此我们将上述伪代码改为如下:

var msec = -1
for {
   n, err = syscall.EpollWait(epfd, events, msec)
   if n <= 0 {
      msec = -1
      runtime.Gosched()
      continue
   }
   msec = 0
   ...
}

测试表明,调整代码后,吞吐量 ↑12%,TP99 ↓64%,获得了显著的延迟收益。

合理利用 unsafe.Pointer

继续研究 epoll_wait,我们发现 Go 官方对外提供的 syscall.EpollWait 和 runtime 自用的 epollwait 是不同的版本,即两者使用了不同的 EpollEvent。以下我们展示两者的区别:

// @syscall
type EpollEvent struct {
   Events uint32
   Fd     int32
   Pad    int32
}
// @runtime
type epollevent struct {
   events uint32
   data   [8]byte // unaligned uintptr
}

我们看到,runtime 使用的 epollevent 是系统层 epoll 定义的原始结构;而对外版本则对其做了封装,将 epoll_data(epollevent.data) 拆分为固定的两字段:Fd 和 Pad。那么 runtime 又是如何使用的呢?在源码里我们看到这样的逻辑:

*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd

pd := *(**pollDesc)(unsafe.Pointer(&ev.data))

显然,runtime 使用 epoll_data(&ev.data) 直接存储了 fd 对应结构体(pollDesc)的指针,这样在事件触发时,可以直接找到结构体对象,并执行相应逻辑。而对外版本则由于只能获得封装后的 Fd 参数,因此需要引入额外的 Map 来增删改查结构体对象,这样性能肯定相差很多。

所以我们果断抛弃了 syscall.EpollWait,转而仿照 runtime 自行设计了 EpollWait 调用,同样采用 unsafe.Pointer 存取结构体对象。测试表明,该方案下 吞吐量 ↑10%,TP99 ↓10%,获得了较为明显的收益。

Thrift 序列化/反序列化优化

序列化是指把数据结构或对象转换成字节序列的过程,反序列化则是相反的过程。RPC 在通信时需要约定好序列化协议,client 在发送请求前进行序列化,字节序列通过网络传输到 server,server 再反序列进行逻辑处理,完成一次 RPC 请求。Thrift 支持 Binary、Compact 和 JSON 序列化协议。目前公司内部使用的基本都是 Binary,这里只介绍 Binary 协议。

Binary 采用 TLV 编码实现,即每个字段都由 TLV 结构来描述,TLV 意为:Type 类型, Lenght 长度,Value 值,Value 也可以是个 TLV 结构,其中 Type 和 Length 的长度固定,Value 的长度则由 Length 的值决定。TLV 编码结构简单清晰,并且扩展性较好,但是由于增加了 Type 和 Length,有额外的内存开销,特别是在大部分字段都是基本类型的情况下有不小的空间浪费。

序列化和反序列的性能优化从大的方面来看可以从空间和时间两个维度进行优化。从兼容已有的 Binary 协议来看,空间上的优化似乎不太可行,只能从时间维度进行优化,包括:

  1. 减少内存操作次数,包括内存分配和拷贝,尽量预分配内存,减少不必要的开销;

  2. 减少函数调用次数,比如可调整代码结构和 inline 等手段进行优化;

调研

根据 go_serialization_benchmarks 的压测数据,我们找到了一些性能卓越的序列化方案进行调研,希望能够对我们的优化工作有所启发。

通过对 protobuf、gogoprotobuf 和 Cap’n Proto 的分析,我们得出以下结论:

  1. 网络传输中出于 IO 的考虑,都会尽量压缩传输数据,protobuf 采用了 Varint 编码在大部分场景中都有着不错的压缩效果;

  2. gogoprotobuf 采用预计算方式,在序列化时能够减少内存分配次数,进而减少了内存分配带来的系统调用、锁和 GC 等代价;

  3. Cap’n Proto 直接操作 buffer,也是减少了内存分配和内存拷贝(少了中间的数据结构),并且在 struct pointer 的设计中把固定长度类型数据和非固定长度类型数据分开处理,针对固定长度类型可以快速处理;

从兼容性考虑,不可能改变现有的 TLV 编码格式,因此数据压缩不太现实,但是 2 和 3 对我们的优化工作是有启发的,事实上我们也是采取了类似的思路。

思路

减少内存操作

buffer 管理

无论是序列化还是反序列化,都是从一块内存拷贝数据到另一块内存,这就涉及到内存分配和内存拷贝操作,尽量避免内存操作可以减少不必要的系统调用、锁和 GC 等开销。

事实上 KiteX 已经提供了 LinkBuffer 用于 buffer 的管理,LinkBuffer 设计上采用链式结构,由多个 block 组成,其中 block 是大小固定的内存块,构建对象池维护空闲 block,由此复用 block,减少内存占用和 GC。

刚开始我们简单地采用 sync.Pool 来复用 netpoll 的 LinkBufferNode,但是这样仍然无法解决对于大包场景下的内存复用(大的 Node 不能回收,否则会导致内存泄漏)。目前我们改成了维护一组 sync.Pool,每组中的 buffer size 都不同,新建 block 时根据最接近所需 size 的 pool 中去获取,这样可以尽可能复用内存,从测试来看内存分配和 GC 优化效果明显。

string / binary 零拷贝

对于有一些业务,比如视频相关的业务,会在请求或者返回中有一个很大的 Binary 二进制数据代表了处理后的视频或者图片数据,同时会有一些业务会返回很大的 String(如全文信息等)。这种场景下,我们通过火焰图看到的热点都在数据的 copy 上,那我们就想了,我们是否可以减少这种拷贝呢?

答案是肯定的。既然我们底层使用的 Buffer 是个链表,那么就可以很容易地在链表中间插入一个节点。

字节跳动 Go RPC 框架 KiteX 性能优化实践

我们就采用了类似的思想,当序列化的过程中遇到了 string 或者 binary 的时候, 将这个节点的 buffer 分成两段,在中间原地插入用户的 string / binary 对应的 buffer,这样可以避免大的 string / binary 的拷贝了。

这里再介绍一下,如果我们直接用 []byte(string) 去转换一个 string 到 []byte 的话实际上是会发生一次拷贝的,原因是 Go 的设计中 string 是 immutable 的但是 []byte 是 mutable 的,所以这么转换的时候会拷贝一次;如果要不拷贝转换的话,就需要用到 unsafe 了:

func StringToSliceByte(s string) []byte {
   l := len(s)
   return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
      Data: (*(*reflect.StringHeader)(unsafe.Pointer(&s))).Data,
      Len:  l,
      Cap:  l,
   }))
}

这段代码的意思是,先把 string 的地址拿到,再拼装上一个 slice byte 的 header,这样就可以不拷贝数据而将 string 转换成 []byte 了,不过要注意这样生成的 []byte 不可写,否则行为未定义。

预计算

线上存在某些服务有大包传输的场景,这种场景下会引入不小的序列化 / 反序列化开销。一般大包都是容器类型的大小非常大导致的,如果能够提前计算出 buffer,一些 O(n) 的操作就能降到 O(1),减少了函数调用次数,在大包场景下也大量减少了内存分配的次数,带来的收益是可观的。

基本类型

如果容器元素为基本类型(bool, byte, i16, i32, i64, double)的话,由于基本类型大小固定,在序列化时是可以提前计算出总的大小,并且一次性分配足够的 buffer,O(n) 的 malloc 操作次数可以降到 O(1),从而大量减少了 malloc 的次数,同理在反序列化时可以减少 next 的操作次数。

struct 字段重排

上面的优化只能针对容器元素类型为基本类型的有效,那么对于元素类型为 struct 的是否也能优化呢?答案是肯定的。

沿用上面的思路,假如 struct 中如果存在基本类型的 field,也可以预先计算出这些 field 的大小,在序列化时为这些 field 提前分配 buffer,写的时候也把这些 field 顺序统一放到前面写,这样也能在一定程度上减少 malloc 的次数。

一次性计算

上面提到的是基本类型的优化,如果在序列化时,先遍历一遍 request 所有 field,便可以计算得到整个 request 的大小,提前分配好 buffer,在序列化和反序列时直接操作 buffer,这样对于非基本类型也能有优化效果。

定义新的 codec 接口:

type thriftMsgFastCodec interface {
   BLength() int // count length of whole req/resp
   FastWrite(buf []byteint
   FastRead(buf []byte) (int, error)
}

在 Marshal 和 Unmarshal 接口中做相应改造:

func (c thriftCodec) Marshal(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
    ...
    if msg, ok := data.(thriftMsgFastCodec); ok {
       msgBeginLen := bthrift.Binary.MessageBeginLength(methodName, thrift.TMessageType(msgType), int32(seqID))
       msgEndLen := bthrift.Binary.MessageEndLength()
       buf, err := out.Malloc(msgBeginLen + msg.BLength() + msgEndLen)// malloc once
       if err != nil {
          return perrors.NewProtocolErrorWithMsg(fmt.Sprintf("thrift marshal, Malloc failed: %s", err.Error()))
       }
       offset := bthrift.Binary.WriteMessageBegin(buf, methodName, thrift.TMessageType(msgType), int32(seqID))
       offset += msg.FastWrite(buf[offset:])
       bthrift.Binary.WriteMessageEnd(buf[offset:])
       return nil
    }
    ...
}

func (c thriftCodec) Unmarshal(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {

data := message.Data()
if msg, ok := data.(thriftMsgFastCodec); ok && message.PayloadLen() != 0 {
msgBeginLen := bthrift.Binary.MessageBeginLength(methodName, msgType, seqID)
buf, err := tProt.next(message.PayloadLen() – msgBeginLen – bthrift.Binary.MessageEndLength()) // next once
if err != nil {
return remote.NewTransError(remote.PROTOCOL_ERROR, err.Error())
}
_, err = msg.FastRead(buf)
if err != nil {
return remote.NewTransError(remote.PROTOCOL_ERROR, err.Error())
}
err = tProt.ReadMessageEnd()
if err != nil {
return remote.NewTransError(remote.PROTOCOL_ERROR, err.Error())
}
tProt.Recycle()
return err
}

}

生成代码中也做相应改造:

func (p *Demo) BLength() int {
        l := 0
        l += bthrift.Binary.StructBeginLength("Demo")
        if p != nil {
                l += p.field1Length()
                l += p.field2Length()
                l += p.field3Length()
    ...
        }
        l += bthrift.Binary.FieldStopLength()
        l += bthrift.Binary.StructEndLength()
        return l
}

func (p *Demo) FastWrite(buf []byte) int {
offset := 0
offset += bthrift.Binary.WriteStructBegin(buf[offset:], “Demo”)
if p != nil {
offset += p.fastWriteField2(buf[offset:])
offset += p.fastWriteField4(buf[offset:])
offset += p.fastWriteField1(buf[offset:])
offset += p.fastWriteField3(buf[offset:])
}
offset += bthrift.Binary.WriteFieldStop(buf[offset:])
offset += bthrift.Binary.WriteStructEnd(buf[offset:])
return offset
}

使用 SIMD 优化 Thrift 编码

公司内广泛使用 list<i64/i32> 类型来承载 ID 列表,并且 list<i64/i32> 的编码方式十分符合向量化的规律,于是我们用了 SIMD 来优化 list<i64/i32> 的编码过程。

我们使用了 avx2,优化后的结果比较显著,在大数据量下针对 i64 可以提升 6 倍性能,针对 i32 可以提升 12 倍性能;在小数据量下提升更明显,针对 i64 可以提升 10 倍,针对 i32 可以提升 20 倍。

减少函数调用

inline

inline 是在编译期间将一个函数调用原地展开,替换成这个函数的实现,它可以减少函数调用的开销以提高程序的性能。

在 Go 中并不是所有函数都能 inline,使用参数-gflags="-m"运行进程,可显示被 inline 的函数。以下几种情况无法内联:

  1. 包含循环的函数;

  2. 包含以下内容的函数:闭包调用,select,for,defer,go 关键字创建的协程;

  3. 超过一定长度的函数,默认情况下当解析 AST 时,Go 申请了 80 个节点作为内联的预算。每个节点都会消耗一个预算。比如,a = a + 1 这行代码包含了 5 个节点:AS, NAME, ADD, NAME, LITERAL。当一个函数的开销超过了这个预算,就无法内联。

编译时通过指定参数-l可以指定编译器对代码内联的强度(go 1.9+),不过这里不推荐大家使用,在我们的测试场景下是 buggy 的,无法正常运行:

// The debug['l'] flag controls the aggressiveness. Note that main() swaps level 0 and 1, making 1 the default and -l disable. Additional levels (beyond -l) may be buggy and are not supported.
//      0: disabled
//      1: 80-nodes leaf functions, oneliners, panic, lazy typechecking (default)
//      2: (unassigned)
//      3: (unassigned)
//      4: allow non-leaf functions

内联虽然可以减少函数调用的开销,但是也可能因为存在重复代码,从而导致 CPU 缓存命中率降低,所以并不能盲目追求过度的内联,需要结合 profile 结果来具体分析。

go test -gcflags='-m=2' -v -test.run TestNewCodec 2>&1 | grep "function too complex" | wc -l
48

go test -gcflags=‘-m=2 -l=4’ -v -test.run TestNewCodec 2>&1 | grep “function too complex” | wc -l
25

 

从上面的输出结果可以看出,加强内联程度确实减少了一些”function too complex”,看下 benchmark 结果:

字节跳动 Go RPC 框架 KiteX 性能优化实践

上面开启最高程度的内联强度,确实消除了不少因为“function too complex”带来无法内联的函数,但是压测结果显示收益不太明显。

测试结果

我们构建了基准测试来对比优化前后的性能,下面是测试结果。

环境:Go 1.13.5 darwin/amd64 on a 2.5 GHz Intel Core i7 16GB

小包

data size: 20KB

字节跳动 Go RPC 框架 KiteX 性能优化实践

大包

data size: 6MB

字节跳动 Go RPC 框架 KiteX 性能优化实践

无拷贝序列化

在一些 request 和 response 数据较大的服务中,序列化和反序列化的代价较高,有两种优化思路:

  1. 如前文所述进行序列化和反序列化的优化

  2. 以无拷贝序列化的方式进行调用

调研

通过无拷贝序列化进行 RPC 调用,最早出自 Kenton Varda 的 Cap’n Proto 项目,Cap’n Proto 提供了一套数据交换格式和对应的编解码库。

Cap’n Proto 本质上是开辟一个 bytes slice 作为 buffer ,所有对数据结构的读写操作都是直接读写 buffer,读写完成后,在头部添加一些 buffer 的信息就可以直接发送,对端收到后即可读取,因为没有 Go 语言结构体作为中间存储,所有无需序列化这个步骤,反序列化亦然。

简单总结下 Cap’n Proto 的特点:

  1. 所有数据的读写都是在一段连续内存中

  2. 将序列化操作前置,在数据 Get/Set 的同时进行编解码

  3. 在数据交换格式中,通过 pointer(数据存储位置的 offset)机制,使得数据可以存储在连续内存的任意位置,进而使得结构体中的数据可以以任意顺序读写

    1. 对于结构体的固定大小字段,通过重新排列,使得这些字段存储在一块连续内存中
    2. 对于结构体的不定大小字段(如 list),则通过一个固定大小的 pointer 来表示,pointer 中存储了包括数据位置在内的一些信息

首先 Cap’n Proto 没有 Go 语言结构体作为中间载体,得以减少一次拷贝,然后 Cap’n Proto 是在一段连续内存上进行操作,编码数据的读写可以一次完成,因为这两个原因,使得 Cap’ Proto 的性能表现优秀。

下面是相同数据结构下 Thrift 和 Cap’n Proto 的 Benchmark,考虑到 Cap’n Proto 是将编解码操作前置了,所以对比的是包括数据初始化在内的完整过程,即结构体数据初始化+(序列化)+写入 buffer +从 buffer 读出+(反序列化)+从结构体读出数据。

struct MyTest {
    1: i64 Num,
    2: Ano Ano,
    3: list<i64> Nums, // 长度131072 大小1MB
}

struct Ano {
1: i64 Num,
}

字节跳动 Go RPC 框架 KiteX 性能优化实践

(反序列化)+读出数据,视包大小,Cap’n Proto 性能大约是 Thrift 的 8-9 倍。写入数据+(序列化),视包大小,Cap’n Proto 性能大约是 Thrift 的 2-8 倍。整体性能 Cap’ Proto 性能大约是 Thrift 的 4-8 倍。

前面说了 Cap’n Proto 的优势,下面总结一下 Cap’n Proto 存在的一些问题:

  1. Cap’n Proto 的连续内存存储这一特性带来的一个问题:当对不定大小数据进行 resize ,且需要的空间大于原有空间时,只能在后面重新分配一块空间,导致原来数据的空间成为了一个无法去掉的 hole 。这个问题随着调用链路的不断 resize 会越来越严重,要解决只能在整个链路上严格约束:尽量避免对不定大小字段的 resize ,当不得不 resize 的时候,重新构建一个结构体并对数据进行深拷贝。

  2. Cap’n Proto 因为没有 Go 语言结构体作为中间载体,使得所有的字段都只能通过接口进行读写,用户体验较差。

Thrift 协议兼容的无拷贝序列化

Cap’n Proto 为了更好更高效地支持无拷贝序列化,使用了一套自研的编解码格式,但在现在 Thrift 和 ProtoBuf 占主流的环境中难以铺开。为了能在协议兼容的同时获得无拷贝序列化的性能,我们开始了 Thrift 协议兼容的无拷贝序列化的探索。

Cap’n Proto 作为无拷贝序列化的标杆,那么我们就看看 Cap’n Proto 上的优化能否应用到 Thrift 上:

  1. 自然是无拷贝序列化的核心,不使用 Go 语言结构体作为中间载体,减少一次拷贝。此优化点是协议无关的,能够适用于任何已有的协议,自然也能和 Thrift 协议兼容,但是从 Cap’n Proto 的使用上来看,用户体验还需要仔细打磨一下。

  2. Cap’n Proto 是在一段连续内存上进行操作,编码数据的读写可以一次完成。Cap’n Proto 得以在连续内存上操作的原因:有 pointer 机制,数据可以存储在任意位置,允许字段可以以任意顺序写入而不影响解码。但是一方面,在连续内存上容易因为误操作,导致在 resize 的时候留下 hole,另一方面,Thrift 没有类似于 pointer 的机制,故而对数据布局有着更严格的要求。这里有两个思路:

    1. 坚持在连续内存上进行操作,并对用户使用提出严格要求:1. resize 操作必须重新构建数据结构 2. 当存在结构体嵌套时,对字段写入顺序有着严格要求(可以想象为把一个存在嵌套的结构体从外往里展开,写入时需要按展开顺序写入),且因为 Binary 等 TLV 编码的关系,在每个嵌套开始写入时,需要用户主动声明(如 StartWriteFieldX)。
    2. 不完全在连续内存上操作,局部内存连续,可变字段则单独分配一块内存,既然内存不是完全连续的,自然也无法做到一次写操作便完成输出。为了尽可能接近一次写完数据的性能,我们采取了一种链式 buffer 的方案,一方面当可变字段 resize 时只需替换链式 buffer 的一个节点,无需像 Cap’n Proto 一样重新构建结构体,另一方面在需要输出时无需像 Thrift 一样需要感知实际的结构,只要把整个链路上的 buffer 写入即可。

先总结下目前确定的两个点:1. 不使用 Go 语言结构体作为中间载体,通过接口直接操作底层内存,在 Get/Set 时完成编解码 2. 通过链式 buffer 存储数据

然后让我们看下目前还有待解决的问题:

  1. 不使用 Go 语言结构体后带来的用户体验劣化

    1. 解决方案:改善 Get/Set 接口的使用体验,尽可能做到和 Go 语言结构体同等的易用
  2. Cap’n Proto 的 Binary Format 是针对无拷贝序列化场景专门设计的,虽然每次 Get 时都会进行一次解码,但是解码代价非常小。而 Thrift 的协议(以 Binary 为例),没有类似于 pointer 的机制,当存在多个不定大小字段或者存在嵌套时,必须顺序解析而无法直接通过计算偏移拿到字段数据所在的位置,而每次 Get 都进行顺序解析的代价过于高昂。

    1. 解决方案:我们在表示结构体的时候,除了记录结构体的 buffer 节点,还加了一个索引,里面记录了每个不定大小字段开始的 buffer 节点的指针。

下面是目前的无拷贝序列化方案与 FastRead/Write,在 4 核下的极限性能对比测试:

字节跳动 Go RPC 框架 KiteX 性能优化实践

测试结果概述:

  1. 小包场景,无序列化性能表现较差,约为 FastWrite/FastRead 的 85%。

  2. 大包场景,无序列化性能表现较好,4K 以上的包较 FastWrite/FastRead 提升 7%-40%。

后记

希望以上的分享能够对社区有所帮助。同时,我们也在尝试 share memory-based IPC、io_uring、tcp zero copy 、RDMA 等,更好地提升 KiteX 性能;重点优化同机、同容器的通讯场景。欢迎各位感兴趣的同学加入我们,共同建设 Go 语言生态!

参考资料

  1. https://github.com/alecthomas/go_serialization_benchmarks

  2. https://capnproto.org/

  3. https://software.intel.com/content/www/us/en/develop/documentation/cpp-compiler-developer-guide-and-reference/top/compiler-reference/intrinsics/intrinsics-for-intel-advanced-vector-extensions-2/intrinsics-for-shuffle-operations-1/mm256-shuffle-epi8.html

基于SSD的Kafka应用层缓存架构设计与实现

Kafka出色的I/O优化以及多处异步化设计,相比其他消息队列系统具有更高的吞吐,同时能够保证不错的延迟,十分适合应用在整个大数据生态中。

目前在美团数据平台中,Kafka承担着数据缓冲和分发的角色。如下图所示,业务日志、接入层Nginx日志或线上DB数据通过数据采集层发送到Kafka,后续数据被用户的实时作业消费、计算,或经过数仓的ODS层用作数仓生产,还有一部分则会进入公司统一日志中心,帮助工程师排查线上问题。

基于SSD的Kafka应用层缓存架构设计与实现

目前美团线上Kafka规模:

  • 集群规模:节点数达6000+,集群数100+。
  • 集群承载:Topic数6万+,Partition数41万+。
  • 处理的消息规模:目前每天处理消息总量达8万亿,峰值流量为1.8亿条/秒
  • 提供的服务规模:目前下游实时计算平台运行了3万+作业,而这其中绝大多数的数据源均来自Kafka。

Kafka线上痛点分析&核心目标

当前Kafka支撑的实时作业数量众多,单机承载的Topic和Partition数量很大。这种场景下很容易出现的问题是:同一台机器上不同Partition间竞争PageCache资源,相互影响,导致整个Broker的处理延迟上升、吞吐下降。

接下来,我们将结合Kafka读写请求的处理流程以及线上统计的数据来分析一下Kafka在线上的痛点。

原理分析

基于SSD的Kafka应用层缓存架构设计与实现
Kafka处理读写流程的示意图

对于Produce请求:Server端的I/O线程统一将请求中的数据写入到操作系统的PageCache后立即返回,当消息条数到达一定阈值后,Kafka应用本身或操作系统内核会触发强制刷盘操作(如左侧流程图所示)。

对于Consume请求:主要利用了操作系统的ZeroCopy机制,当Kafka Broker接收到读数据请求时,会向操作系统发送sendfile系统调用,操作系统接收后,首先试图从PageCache中获取数据(如中间流程图所示);如果数据不存在,会触发缺页异常中断将数据从磁盘读入到临时缓冲区中(如右侧流程图所示),随后通过DMA操作直接将数据拷贝到网卡缓冲区中等待后续的TCP传输。

综上所述,Kafka对于单一读写请求均拥有很好的吞吐和延迟。处理写请求时,数据写入PageCache后立即返回,数据通过异步方式批量刷入磁盘,既保证了多数写请求都能有较低的延迟,同时批量顺序刷盘对磁盘更加友好。处理读请求时,实时消费的作业可以直接从PageCache读取到数据,请求延迟较小,同时ZeroCopy机制能够减少数据传输过程中用户态与内核态的切换,大幅提升了数据传输的效率。

但当同一个Broker上同时存在多个Consumer时,就可能会由于多个Consumer竞争PageCache资源导致它们同时产生延迟。下面我们以两个Consumer为例详细说明:

基于SSD的Kafka应用层缓存架构设计与实现

如上图所示,Producer将数据发送到Broker,PageCache会缓存这部分数据。当所有Consumer的消费能力充足时,所有的数据都会从PageCache读取,全部Consumer实例的延迟都较低。此时如果其中一个Consumer出现消费延迟(图中的Consumer Process2),根据读请求处理流程可知,此时会触发磁盘读取,在从磁盘读取数据的同时会预读部分数据到PageCache中。当PageCache空间不足时,会按照LRU策略开始淘汰数据,此时延迟消费的Consumer读取到的数据会替换PageCache中实时的缓存数据。后续当实时消费请求到达时,由于PageCache中的数据已被替换掉,会产生预期外的磁盘读取。这样会导致两个后果:

  1. 消费能力充足的Consumer消费时会失去PageCache的性能红利。
  2. 多个Consumer相互影响,预期外的磁盘读增多,HDD负载升高。

我们针对HDD的性能和读写并发的影响做了梯度测试,如下图所示:

基于SSD的Kafka应用层缓存架构设计与实现

可以看到,随着读并发的增加,HDD的IOPS和带宽均会明显下降,这会进一步影响整个Broker的吞吐以及处理延迟。

线上数据统计

目前Kafka集群TP99流量在170MB/s,TP95流量在100MB/s,TP50流量为50-60MB/s;单机的PageCache平均分配为80GB,取TP99的流量作为参考,在此流量以及PageCache分配情况下,PageCache最大可缓存数据时间跨度为80*1024/170/60 = 8min,可见当前Kafka服务整体对延迟消费作业的容忍性极低。该情况下,一旦部分作业消费延迟,实时消费作业就可能会受到影响。

同时,我们统计了线上实时作业的消费延迟分布情况,延迟范围在0-8min(实时消费)的作业只占80%,说明目前存在线上存在20%的作业处于延迟消费的状态。

痛点分析总结

总结上述的原理分析以及线上数据统计,目前线上Kafka存在如下问题:

  1. 实时消费与延迟消费的作业在PageCache层次产生竞争,导致实时消费产生非预期磁盘读。
  2. 传统HDD随着读并发升高性能急剧下降。
  3. 线上存在20%的延迟消费作业。

按目前的PageCache空间分配以及线上集群流量分析,Kafka无法对实时消费作业提供稳定的服务质量保障,该痛点亟待解决。

预期目标

根据上述痛点分析,我们的预期目标为保证实时消费作业不会由于PageCache竞争而被延迟消费作业影响,保证Kafka对实时消费作业提供稳定的服务质量保障。

解决方案

为什么选择SSD

根据上述原因分析可知,解决目前痛点可从以下两个方向来考虑:

  1. 消除实时消费与延迟消费间的PageCache竞争,如:让延迟消费作业读取的数据不回写PageCache,或增大PageCache的分配量等。
  2. 在HDD与内存之间加入新的设备,该设备拥有比HDD更好的读写带宽与IOPS。

对于第一个方向,由于PageCache由操作系统管理,若修改其淘汰策略,那么实现难度较为复杂,同时会破坏内核本身对外的语义。另外,内存资源成本较高,无法进行无限制的扩展,因此需要考虑第二个方向。

SSD目前发展日益成熟,相较于HDD,SSD的IOPS与带宽拥有数量级级别的提升,很适合在上述场景中当PageCache出现竞争后承接部分读流量。我们对SSD的性能也进行了测试,结果如下图所示:

基于SSD的Kafka应用层缓存架构设计与实现

从图中可以发现,随着读取并发的增加,SSD的IOPS与带宽并不会显著降低。通过该结论可知,我们可以使用SSD作为PageCache与HDD间的缓存层。

架构决策

在引入SSD作为缓存层后,下一步要解决的关键问题包括PageCache、SSD、HDD三者间的数据同步以及读写请求的数据路由等问题,同时我们的新缓存架构需要充分匹配Kafka引擎读写请求的特征。本小节将介绍新架构如何在选型与设计上解决上述提到的问题。

Kafka引擎在读写行为上具有如下特性:

  • 数据的消费频率随时间变化,越久远的数据消费频率越低。
  • 每个分区(Partition)只有Leader提供读写服务。
  • 对于一个客户端而言,消费行为是线性的,数据并不会重复消费。

下文给出了两种备选方案,下面将对两种方案给出我们的选取依据与架构决策。

备选方案一:基于操作系统内核层实现

目前开源的缓存技术有FlashCache、BCache、DM-Cache、OpenCAS等,其中BCache和DM-Cache已经集成到Linux中,但对内核版本有要求,受限于内核版本,我们仅能选用FlashCache/OpenCAS。

如下图所示,FlashCache以及OpenCAS二者的核心设计思路类似,两种架构的核心理论依据为“数据局部性”原理,将SSD与HDD按照相同的粒度拆成固定的管理单元,之后将SSD上的空间映射到多块HDD层的设备上(逻辑映射or物理映射)。在访问流程上,与CPU访问高速缓存和主存的流程类似,首先尝试访问Cache层,如果出现CacheMiss,则会访问HDD层,同时根据数据局部性原理,这部分数据将回写到Cache层。如果Cache空间已满,会通过LRU策略替换部分数据。

基于SSD的Kafka应用层缓存架构设计与实现

FlashCache/OpenCAS提供了四种缓存策略:WriteThrough、WriteBack、WriteAround、WriteOnly。由于第四种不做读缓存,这里我们只看前三种。

写入:

  • WriteThrough:数据写操作在写入SSD的同时会写入到后端存储。
  • WriteBack:数据写操作仅写入SSD即返回,由缓存策略flush到后台存储。
  • WriteAround:数据写入操作直接写入后端存储,同时SSD对应的缓存会失效。

读取:

  • WriteThrough/WriteBack/WriteAround:首先读取SSD,命中不了的将再次读取后端存储,并数据会被刷入到SSD缓存中。
基于SSD的Kafka应用层缓存架构设计与实现

更多详细实现细节,极大可参见这二者的官方文档:

备选方案二:Kafka应用内部实现

上文提到的第一类备选方案中,核心的理论依据“数据局部性”原理与Kafka的读写特性并不能完全吻合,“数据回刷”这一特性依然会引入缓存空间污染问题。同时,上述架构基于LRU的淘汰策略也与Kafka读写特性存在矛盾,在多Consumer并发消费时,LRU淘汰策略可能会误淘汰掉一些近实时数据,导致实时消费作业出现性能抖动。

可见,备选方案一并不能完全解决当前Kafka的痛点,需要从应用内部进行改造。整体设计思路如下,将数据按照时间维度分布在不同的设备中,近实时部分的数据缓存在SSD中,这样当出现PageCache竞争时,实时消费作业从SSD中读取数据,保证实时作业不会受到延迟消费作业影响。下图展示了基于应用层实现的架构处理读请求的流程:

基于SSD的Kafka应用层缓存架构设计与实现

当消费请求到达Kafka Broker时,Kafka Broker直接根据其维护的消息偏移量(Offset)和设备的关系从对应的设备中获取数据并返回,并且在读请求中并不会将HDD中读取的数据回刷到SSD,防止出现缓存污染。同时访问路径明确,不会由于Cache Miss而产生的额外访问开销。

下表对不同候选方案进行了更加详细的对比:

基于SSD的Kafka应用层缓存架构设计与实现

最终,结合与Kafka读写特性的匹配度,整体工作量等因素综合考虑,我们采用Kafka应用层实现这一方案,因为该方案更贴近Kafka本身读写特性,能更加彻底地解决Kafka的痛点。

新架构设计

概述

根据上文对Kafka读写特性的分析,我们给出应用层基于SSD的缓存架构的设计目标:

  • 数据按时间维度分布在不同的设备上,近实时数据分布在SSD上,随时间的推移淘汰到HDD上。
  • Leader分区中所有数据均写入SSD中。
  • 从HDD中读取的数据不回刷到SSD中。

依据上述目标,我们给出应用层基于SSD的Kafka缓存架构实现:

Kafka中一个Partition由若干LogSegment构成,每个LogSegment包含两个索引文件以及日志消息文件。一个Partition的若干LogSegment按Offset(相对时间)维度有序排列。

基于SSD的Kafka应用层缓存架构设计与实现

根据上一小节的设计思路,我们首先将不同的LogSegment标记为不同的状态,如图所示(图中上半部分)按照时间维度分为OnlyCache、Cached以及WithoutCache三种常驻状态。而三种状态的转换以及新架构对读写操作的处理如图中下半部分所示,其中标记为OnlyCached状态的LogSegment只存储在SSD上,后台线程会定期将Inactive(没有写流量)的LogSegment同步到SSD上,完成同步的LogSegment被标记为Cached状态。

最后,后台线程将会定期检测SSD上的使用空间,当空间达到阈值时,后台线程将会按照时间维度将距离现在最久的LogSegment从SSD中移除,这部分LogSegment会被标记为WithoutCache状态。

对于写请求而言,写入请求依然首先将数据写入到PageCache中,满足阈值条件后将会刷入SSD。对于读请求(当PageCache未获取到数据时),如果读取的offset对应的LogSegment的状态为Cached或OnlyCache,则数据从SSD返回(图中LC2-LC1以及RC1),如果状态为WithoutCache,则从HDD返回(图中LC1)。

对于Follower副本的数据同步,可根据Topic对延迟以及稳定性的要求,通过配置决定写入到SSD还是HDD。

关键优化点

上文介绍了基于SSD的Kafka应用层缓存架构的设计概要以及核心设计思路,包括读写流程、内部状态管理以及新增后台线程功能等。本小节将介绍该方案的关键优化点,这些优化点均与服务的性能息息相关。主要包括LogSegment同步以及Append刷盘策略优化,下面将分别进行介绍。

LogSegment同步

LogSegment同步是指将SSD上的数据同步到HDD上的过程,该机制在设计时主要有以下两个关键点:

  1. 同步的方式:同步方式决定了HDD上对SSD数据的可见时效性,从而会影响故障恢复以及LogSegment清理的及时性。
  2. 同步限速:LogSegment同步过程中通过限速机制来防止同步过程中对正常读写请求造成影响

同步方式

关于LogSegment的同步方式,我们给出了三种备选方案,下表列举了三种方案的介绍以及各自的优缺点:

基于SSD的Kafka应用层缓存架构设计与实现

最终,我们对一致性维护代价、实现复杂度等因素综合考虑,选择了后台同步Inactive的LogSegment的方式。

同步限速

LogSegment同步行为本质上是设备间的数据传输,会同时在两个设备上产生额外的读写流量,占用对应设备的读写带宽。同时,由于我们选择了同步Inactive部分的数据,需要进行整段的同步。如果在同步过程中不加以限制会对服务整体延迟造成较大的影响,主要表现在下面两个方面:

  • 从单盘性能角度,由于SSD的性能远高于HDD,因此在数据传输时,HDD写入带宽会被写满,此时其他的读写请求会出现毛刺,如果此时有延迟消费从HDD上读取数据或者Follower正在同步数据到HDD上,会造成服务抖动。
  • 从单机部署的角度,单机会部署2块SSD与10块HDD,因此在同步过程中,1块SSD需要承受5块HDD的写入量,因此SSD同样会在同步过程中出现性能毛刺,影响正常的请求响应延迟。

基于上述两点,我们需要在LogSegment同步过程中增加限速机制,总体的限速原则为在不影响正常读写请求延迟的情况下尽可能快速地进行同步。因为同步速度过慢会导致SSD数据无法被及时清理而最终被写满。同时为了可以灵活调整,该配置也被设置为单Broker粒度的配置参数。

日志追加刷盘策略优化

除了同步问题,数据写入过程中的刷盘机制同样影响服务的读写延迟。该机制的设计不仅会影响新架构的性能,对原生Kafka同样会产生影响。

下图展示了单次写入请求的处理流程:

基于SSD的Kafka应用层缓存架构设计与实现

在Produce请求处理流程中,首先根据当前LogSegment的位置与请求中的数据信息确定是否需要滚动日志段,随后将请求中的数据写入到PageCache中,更新LEO以及统计信息,最后根据统计信息确定是否需要触发刷盘操作,如果需要则通过fileChannel.force强制刷盘,否则请求直接返回。

在整个流程中,除日志滚动与刷盘操作外,其他操作均为内存操作,不会带来性能问题。日志滚动涉及文件系统的操作,目前,Kafka中提供了日志滚动的扰动参数,防止多个Segment同时触发滚动操作给文件系统带来压力。针对日志刷盘操作,目前Kafka给出的机制是以固定消息条数触发强制刷盘(目前线上为50000),该机制只能保证在入流量一定时,消息会以相同的频率刷盘,但无法限制每次刷入磁盘的数据量,对磁盘的负载无法提供有效的限制。

如下图所示,为某磁盘在午高峰时间段write_bytes的瞬时值,在午高峰时间段,由于写入流量的上升,在刷盘过程中会产生大量的毛刺,而毛刺的值几乎接近磁盘最大的写入带宽,这会使读写请求延迟发生抖动。

基于SSD的Kafka应用层缓存架构设计与实现

针对该问题,我们修改了刷盘的机制,将原本的按条数限制修改为按实际刷盘的速率限制,对于单个Segment,刷盘速率限制为2MB/s。该值考虑了线上实际的平均消息大小,如果设置过小,对于单条消息较大的Topic会过于频繁的进行刷新,在流量较高时反而会加重平均延迟。目前该机制已在线上小范围灰度,右图展示了灰度后同时间段对应的write_bytes指标,可以看到相比左图,数据刷盘速率较灰度前明显平滑,最高速率仅为40MB/s左右。

对于SSD新缓存架构,同样存在上述问题,因此在新架构中,在刷盘操作中同样对刷盘速率进行了限制。

方案测试

测试目标

  • 验证基于应用层的SSD缓存架构能够避免实时作业受到延迟作业的影响。
  • 验证相比基于操作系统内核层实现的缓存层架构,基于应用层的SSD架构在不同流量下读写延迟更低。

测试场景描述

  • 构建4个集群:新架构集群、普通HDD集群、FlashCache集群、OpenCAS集群。
  • 每个集群3个节点。
  • 固定写入流量,比较读、写耗时。
  • 延迟消费设置:只消费相对当前时间10~150分钟的数据(超过PageCache的承载区域,不超过SSD的承载区域)。

测试内容及重点关注指标

  • Case1: 仅有延迟消费时,观察集群的生产和消费性能。
  • 重点关注的指标:写耗时、读耗时,通过这2个指标体现出读写延迟。
  • 命中率指标:HDD读取量、HDD读取占比(HDD读取量/读取总量)、SSD读取命中率,通过这3个指标体现出SSD缓存的命中率。
  • Case2: 存在延迟消费时,观察实时消费的性能。
  • 重点指标:实时作业的SLA(服务质量)的5个不同时间区域的占比情况。

测试结果

从单Broker请求延迟角度看:

在刷盘机制优化前,SSD新缓存架构在所有场景下,较其他方案都具有明显优势。

基于SSD的Kafka应用层缓存架构设计与实现

刷盘机制优化后,其余方案在延迟上服务质量有提升,在较小流量下由于Flush机制的优化,新架构与其他方案的优势变小。当单节点写入流量较大时(大于170MB)优势明显。

基于SSD的Kafka应用层缓存架构设计与实现

从延迟作业对实时作业的影响方面看:

新缓存架构在测试所涉及的所有场景中,延迟作业都不会对实时作业产生影响,符合预期。

基于SSD的Kafka应用层缓存架构设计与实现

总结与未来展望

Kafka在美团数据平台承担统一的数据缓存和分发的角色,针对目前由于PageCache互相污染、进而引发PageCache竞争导致实时作业被延迟作业影响的痛点,我们基于SSD自研了Kafka的应用层缓存架构。本文主要介绍Kafka新架构的设计思路以及与其他开源方案的对比。与普通集群相比,新缓存架构具备非常明显的优势:

  1. 降低读写耗时:比起普通集群,新架构集群读写耗时降低80%。
  2. 实时消费不受延迟消费的影响:比起普通集群,新架构集群实时读写性能稳定,不受延时消费的影响。

目前,这套缓存架构优已经验证完成,正在灰度阶段,未来也优先部署到高优集群。其中涉及的代码也将提交给Kafka社区,作为对社区的回馈,也欢迎大家跟我们一起交流。

作者简介

世吉,仕禄,均为美团数据平台工程师。

滴滴从 KV 存储到 NewSQL 实战

Fusion-NewSQL 是由滴滴自研的在分布式 KV 存储基础上构建的 NewSQL 存储系统。Fusion-NewSQL 兼容了 MySQL 协议,支持二级索引功能,提供超大规模数据持久化存储和高性能读写。

一. 遇到的问题

滴滴的业务快速持续发展,数据量和请求量急剧增长,对存储系统等压力与日俱增。虽然分库分表在一定程度上可以解决数据量和请求增加的需求,但是由于滴滴多条业务线(快车,专车,两轮车等)的业务快速变化,数据库加字段加索引的需求非常频繁,分库分表方案对于频繁的 Schema 变更操作并不友好,会导致 DBA 任务繁重,变更周期长,并且对巨大的表操作还会对线上有一定影响。同时,分库分表方案对二级索引支持不友好或者根本不支持。

鉴于上述情况,NewSQL 数据库方案就成为我们解决业务问题的一个方向。

二. 开源产品调研

最开始,我们调研了开源的分布式 NewSQL 方案:TiDB。虽然 TiDB 是非常优秀的 NewSQL 产品,但是对于我们的业务场景来说,TiDB 并不是非常适合,原因如下:

  • 我们需要一款高吞吐,低延迟的数据库解决方案,但是 TiDB 由于要满足事务,2pc 方案天然无法满足低延迟(100ms 以内的 99rt,甚至 50ms 内的 99rt)
  • 我们的多数业务,并不真正需要分布式事务,或者说可以通过其他补偿机制,绕过分布式事务。这是由于业务场景决定的。
  • TiDB 三副本的存储空间成本相对比较高。
  • 我们内部一些离线数据导入在线系统的场景,不能直接和 TiDB 打通。

基于以上原因,我们开启了自研符合自己业务需求的 NewSQL 之路。

三. 我们的基础

我们并没有打算从 0 开发一个完备的 NewSQL 系统,而是在自研的分布式 KV 存储 Fusion 的基础上构建一个能满足我们业务场景的 NewSQL。Fusion 是采用了 Codis 架构,兼容 Redis 协议和数据结构,使用 RocksDB 作为存储引擎的 NoSQL 数据库。Fusion 在滴滴内部已经有几百个业务在使用,是滴滴主要的在线存储之一。

Fusion 的架构图如下:
滴滴从 KV 存储到 NewSQL 实战
我们采用 hash 分片的方式来做数据 sharding。从上往下看,用户通过 Redis 协议的客户端就可以访问 Fusion,用户的访问请求发到 proxy,再由 proxy 转发数据到后端 Fusion 的数据节点。proxy 到后端数据节点的转发,是根据请求的 key 计算 hash 值,然后对 slot 分片数取余,得到一个固定的 slotid,每个 slotid 会固定的映射到一个存储节点,以此解决数据路由问题。

有了一个高并发,低延迟,大容量的存储层后,我们要做的就是在之上构建 MySQL 协议以及二级索引。

需求

滴滴从 KV 存储到 NewSQL 实战

综合考虑大多数用户对需求,我们整理了我们的 NewSQL 需要提供的几个核心能力:

  • 高吞吐,低延迟,大容量
  • 兼容 MySQL 协议及下游生态
  • 支持主键查询和二级索引查询
  • Schema 变更灵活,不影响线上服务稳定性。

架构设计

Fusion-NewSQL 由下面几个部分组成:

1. 解析 MySQL 协议的 DiseServer
2. 存储数据的 Fusion 集群 -Data 集群
3. 存储索引信息的 Fusion 集群 -Index 集群
4. 负责 Schema 的管理配置中心 -ConfigServer
5. 异步构建索引程序 -Consumer 负责消费 Data 集群写到 MQ 中的 MySQL-Binlog 格式数据,根据 schema 信息,生成索引数据写入 Index 集群。
6. 外部依赖,MQ,Zookeeper

架构图如下:
滴滴从 KV 存储到 NewSQL 实战

技术挑战及方案

1.SQL 表转 Hashmap

MySQL 的表结构数据如何转成 Redis 的数据结构是我们面临的第一个问题。

如下图:
滴滴从 KV 存储到 NewSQL 实战
我们将 MySQL 表的一行记录转成 Redis 的一个 Hashmap 结构。Hashmap 的 key 由表名 + 主键值组成,满足了全局唯一的特性。下图展示了 MySQL 通过主键查询转换为 Redis 协议的方式:

滴滴从 KV 存储到 NewSQL 实战
除了数据,索引也需要存储在 Fusion-NewSQL 中,和数据存成 hashmap 不同,索引存储成 key-value 结构。根据索引类型不同,组成 key-value 的格式还有一点细微的差别 (下面的格式为了看起来直观,实际上分隔符,indexname 都是做过编码的):

1. 唯一索引:

Key: table_indexname_indexColumnsValue Value: Rowkey

2. 非唯一索引:

Key: table_indexname_indexColumnsValue_Rowkey Value:null
造成这种差异的原因就是非唯一索引在加入 Rowkey 之前的部分是有可能重复的,无法全局唯一。另外,唯一索引不将 Rowkey 编码在 key 中,是因为在查询语句是单纯的“=”查询的时候直接 get 操作就可以找到对应的 Rowkey 内容,而不需要通过 scan,这样的效率更高。

滴滴从 KV 存储到 NewSQL 实战
后面会在查询流程中重点讲述如何通过二级索引查询到数据。

2. 数据和索引一致性

因为数据和索引分别存储在不同 Fusion 集群,数据和索引的一致性保证就成了 Fusion-New 系统面临的一个关键点,在没有分布式事务的情况下,我们当前选择了保证数据索引的最终一致性。用户写入数据在数据集群中开启 RocksDB 的单机事物,同时按链接保序,这样数据流入 MQ 的时候就是有序的。异步模块从 MQ 中消费出来再批量写入到索引集群,整个流程就保证的索引数据的构建与数据集群真实的顺序一致。当然,这中间存在一个时间窗口的数据不一致,这个时间取决于 MQ 的吞吐能力。

3. 二级索引查询

下面是一个使用二级索引查询数据的案例:
dise-server 会根据用户查询条件和当前所有索引做匹配,找到符合的索引,然后通过 Redis 的 scan 命令,按前缀搜索 index 集群的数据,获取符合条件的主键。

如下图:
滴滴从 KV 存储到 NewSQL 实战
通过主键,可以直接到 Data 集群查到相应的数据。

根据上面索引数据的格式可以看到,scan 范围的时候,前缀必须固定,映射到 SQL 语句到时候,意味着 where 到条件中,范围查询只能有一个字段,而不能多个字段。

比如:
滴滴从 KV 存储到 NewSQL 实战
索引是 age 和 name 两个字段的联合索引。
如果查询语句如下:
select * from student where age > 20 and name >‘W’;
scan 就没有办法确定前缀,也就无法通过 index_age_name 这个索引查询到满足条件的数据,所以使用 KV 形式存储到索引只能满足 where 条件中有一个字段是范围查询。当然可以通过将联合索引分开存放,多次交互搜索取交集的方式解决,但是这就和我们减少 RPC 次数,降低延迟的设计初衷相违背了。为了解决这个问题,我们引入了 Elastic Search 搜索引擎。

架构图如下:
滴滴从 KV 存储到 NewSQL 实战
我们建议用户将需要复杂查询的字段设置为 ES 索引,consumer 消费 MQ 的时候将这些字段数据写一份到 ES 中,这样对于对查询条件简单,延迟敏感的查询,使用 Index 集群的数据;对条件复杂,延迟不敏感的查询使用 ES。这样解决了二级索引功能丰富性问题。

4. 生态构建

一个单独的存储产品解决所有问题的时代早已经过去,数据孤岛是没有办法很好服务业务的,如何与滴滴现有个各个数据系统打通数据,成了我们必须面对的问题。下面分数据流出到其他系统和从其他系统导入两个方面来阐述 Fusion-NewSQL 的数据流动方案。

4.1. Fusion-NewSQL 到其他存储系统

Fusion-NewSQL 是一个新系统,没办法短时间让各个数据系统为我们做适配。既然 Fusion-NewSQL 已经有了 Schema 信息,那么通过兼容 MySQL 的 Binlog 格式,将 Fusion-NewSQL 在数据链路中伪装成 MySQL,就可以直接使用 Mysql 的下游数据流动链路。这样的方式用最小的工作量最大程度做到了兼容。

4.2.Hive 到 Fusion-NewSQL

Fusion-NewSQL 还支持将离线的 Hive 表中的数据通过 Fusion-NewSQL 提供的 FastLoad(DTS)工具,将 Hive 表数据转入到 Fusion-NewSQL,满足离线数据到在线的数据流动。

如果用户自己完成数据流转,一般会扫描 Hive 表,然后构建 MySQL 的写入语句,一条条将数据写入到 Fusion-NewSQL,

流程如下面这样:
滴滴从 KV 存储到 NewSQL 实战
从上面的流程可以看出这种迁移方式有几个问题:

1. 每条 Hive 数据都要经过较长链路,数据导入耗时较长。

2. 离线平台的数据量大,吞吐高,数据导入直接大幅提升在线系统的 QPS,对在线系统的稳定性有较大影响。

从上面的痛点可以看出来,主要的问题是离线数据导入使用了在线系统复杂的 IO 链路。所以如何绕过在线的长 IO 链路,做批量导入就成了解决这个问题的关键。我们设计了 Fastload 数据导入平台,绕过在线 IO 路径

流程如下:
滴滴从 KV 存储到 NewSQL 实战
通过 Hadoop 并行计算,将需要导入的 Hive 数据直接构建成 Fusion-NewSQL 能识别的 sst 文件。Fusion-NewSQL 直接将 sst 文件从远端下载到本地,然后使用存储节点通过 Rocksdb 提供 ingest 功能,直接将 sst 文件加载到 Fusion-NewSQL 中,用户可以读到加载到 sst 文件中的数据。通过这样的预先构建 sst 文件,直接文件网络传输和存储引擎直接加载的步骤,就避免了数据导入走在线 IO 复杂流程,解决了稳定性问题,同时将数据导入耗时减少到原来的 1/10。

总结

通过解决上面的技术点,我们用了较小的代价,构建了一个基于 KV 存储的 NewSQL 系统,并且快速将 Fusion-NewSQL 系统接入到滴滴整体的数据链路中。虽然这不是一个完备的 NewSQL 系统,但已经可以满足大多数业务场景的需要,切实实现了 20% 工作量满足 80% 功能的需求。当前 Fusion-NewSQL 已经接入订单、预估、账单、用户中心、交易引擎等核心业务,总的数据如下图:

滴滴从 KV 存储到 NewSQL 实战

后续工作

  • 有限制的事物支持,比如让业务规划落在一个节点的数据可以支持单机跨行事务;
  • 实时索引替代异步索引,满足即写即读。目前已经有一个写穿 + 补偿机制的方案,在没有分布式事务的前提下满足正常状态的实时索引,异常情况下保证数据索引最终一致的方案;
  • 更多的 SQL 协议和功能支持。