在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
1. 相关背景搜索事业部与计算平台事业部目前使用消息队列主要有以下三种场景: 1. 每天有上万张表需要通过Build Service来构建索引。这些表主要来自主搜索,IGRAPH,Rank Service等业务,且每个表包含的文档数差别很大。总数据量为PB级别,总文档数达万亿级。文档的大小不一,小到几十Byte大到几百KB。在Build Service内部,文档处理与索引构建需要一个消息队列来传送消息。因此在build时,容易产生突发大流量(几百G/秒,几千万条/秒)持续消息写入与读取。 2. 搜索的在线服务如主搜索查询服务,RankService打分服务或IGRAPH服务需要毫秒级的实时文档更新。这些服务引擎基本上是多行多列结构,即每一行是一个完整的服务单元,由多台机器组成,多行提升服务的总能力。大的服务通常包含数百行,所以一条实时消息通常会被消费数百次,在线同时实时消费的机器规模也达上万台。 3. 在线的实时消息主要来自离线实时模型训练,用户的实时浏览、点击、加购行为或者商家的增删改宝贝等。离线训练任务会同时启动几十万个worker对上千张表产生实时消息,写请求每秒达千万次。 对于这几种场景,传统的消息队列(如Kafka等)要同时满足,至少需要成百上千台物理机,且系统还需要做改造来适用于每天上万个topic的增减和几十万的生产者与消费者同时读写消息。另外这些机器的failover管理也是个大问题。现实中,搜索团队所有的机器都是由调度系统统一管理和复用,没有专门的物理机可给消息系统独占使用。 2. Swift介绍 传统的消息队列通常为消息的安全性,一般先要求消息落盘到本机后才返回成功。这限制的机器的迁移,扩展和复用。因为消息数据只存一两台机器,机器迁移必然导致数据的迁移。传统消息队列要有较高的性能,通常先要解写磁盘的毛刺等io问题。特别是机器与其它应用复用时IO问题并不好解决。所以传统的消息队列一般要求机器独占使用。 2.1 Swift系统结构 Swift消息系统是在计算与存储分离上的一次尝试。它主要有以下特点: 图 1 SWIFT系统结构图 图1是SWIFT系统结构,其主要分成2种worker: Admin和Broker。Admin和Broker的资源分配与启动都是基于调度系统。目前支持Hippo与Yarn这两种调度系统,这2种worker都会有很多个实例,Broker worker都是等价的,Admin worker则有一个leader,其余的等价,这些worker一般在Docker容器中工作。 2.2 Swift Topic介绍 Swift系统中的topic与其它消息系统的类似,它是一堆相关消息的集合,通常由业务自定义。如图二所示,在swift中topic是由65536个逻辑分区组成,编号是[0 - 65535]。在Swift消息系统内部,topic是由partition组成的,每个partition负责一个range的逻辑partition读写。
图 2 swift topic数据写入示意图 3. 可靠消息读写机制先前提到传统的消息系统为了保证消息的可靠性,在写消息时需要先落盘,以防机器挂掉时,消息不丢失。Swift也提供类似的模式,但落盘的对象是分布式文件系统如HDFS。这种模式下正常写落盘消息延时的毫秒级,当HDFS压力大时,会变成秒级,所以其性能不太稳定。Swift 设计了一种client与broker之间,broker与HDFS之间的消息写入与确认协议来保证消息高效可靠的写入与持久化,其机制类似TCP的滑动窗口协议。 图3是消息异步安全发送的示意图。Broker在分配到partition进行服务时,会生成一个标记,其由partition的版本号(V),broker加载partition时间戳(S)以及消息持久化的checkpoint (C)组成。Client在向admin定位到partition所在broker的时也会获取partition的版本号(V)。版本号V主要在topic属性发生变化时(例如partition的个数等)会更新。时间戳每次partition发生重加载或调度都会发生变化。 图 3 SWIFT异步安全发送消息示意图 异步安全写消息工作流程如下: 1. 用户通过客户端写入一条消息,client定位到写哪个物理partition,同时把消息写入到对应的buffer中。用户写消息时,还可以给每条消息设置一个递增编号,swift client会自动映射写消息进度与编号的关系。在异步模式的,client会有专门的提交线程与broker进行通信。 2. Client第一次向partition发送消息时,broker会验证partition的版本V0, 匹配后才会接受消息,同时会把三元组(V,S, C)返回。client收到accept消息后,会更新已接受消息的光标和协议的三元组信息。 3. 客户端可以持续的写入消息,同时broker那能partition中的消息做异步持久化,当持久化成功时,会更新持久化信息(Ca)。持久化成功的消息在内存中不会马上删除,只有内存不足时才会被回收。 4. Client的后台发送线程继续工作,发送消息b,同时请求带上了(V0,S0)。 5. Broker端验证(V0,S0),收受消息b,顺便把持久化信息也返回(V0,S0,Ca), client接收到accept信息后,更新已发送的光标到b,同时更新已接受的光标到a。消息a已经持久化成功,在使用的内存将会被writer回收。Writer更新checkpoint (Ca)给用户层,表示消息a已经持久化。 6. 同3一样,client继续写消息c,broker继续持久化消息b。 7. 此时partition发生了调度(例如被分配到了其它机器),其HDFS上的文件消息马上可以读取到,但内存中的消息会被清空。此时partition加载时间戳变成了S1。Client向admin重新定位到partition的服务broker写入的消息c和(V0,S0)。 8. Broker检查client发送的(V0,S0)与自身的(V0,S1)不相等,将拒绝此次消息的写入。主要基于消息在partition内要求保序考虑。此时client还不知道b是否被序列化成功,partition重新被加载b是否被序列化成功的信息也会被丢弃(无状态),所以它也不知道。因Broker返回(V0,S1,C0),要求client重新发送未持久化的所有消息。 9. Client 重置已发送光标到b之前,更新S1并重新发送消息b和c。 10. Broker检验client的(V0,S1)并收受消息b和c,这时消息b会被再次持久化化到HDFS上。Client重新更新已发送光标到c。如果此后无新消息的写入,且buffer中的消息还有未被持久化的,client会发起一次空写操作获取最新的持久化信息。 步骤1-10是异步消息写入的工作方式,用户层可以获取到当前持久化消息的checkpoint,可以自己记录发送进度以便回滚。如果不方便记录发送进度,可以在写完一段数据后,调用flush方法强制把数据从client的buffer放到broker的buffer中。此时消息虽然没有被持久化,但在client与partition各存一份。所以只有在broker与client同时挂掉才出现消息丢失,因此我们认为这种方法也是比较安全的。 4. Swift IN双十一 2017年swift消息系统开始在搜索事业部与计算平台事业部大规模应用,主要场景除主搜索外,还包括Porsche,K-monitor,IGraph, DII, OpenSearch,RankService等业务。另外,Swift在蚂蚁金服,阿里妈妈和神马事业部也有多套swift机群的部署。 5. 总结与展望Swift消息系统经过一年多的不断改进与优化,目前每天能处理PB级与万亿级的消息,但在不久的将来还需要解一些问题: 1. 超大量client如百万级的client写入,涉及到的partition定位与worker的连接问题。当client达百万时,首先碰到的一个问题是连接数不够用,目前离线的一个client写数据会对所有相关加载partition的worker产生连接,如果worker有N个,partition有M个,其连接数达N*M个。其次partition发生调度时,partition的定位瞬间能打爆Admin。 2.每秒百亿级别的消息读写时,如何减少系统处理消息量。Swift目前有client主动合并消息的优化,但能合并的消息数量并不多,能否在broker端进行消息合并与存储。在大规模消息读写时如何降低对HDFS的压力。Swift目前提供内存topic等来尽可能的减少消息落盘,是否有更好的机制也需要探索。 6. 相关职位招聘
岗位描述:参与阿里巴巴集团内部实时消息系统开发,支持每秒万亿级别消息读写,提供高可靠、高性能、高伸缩、低延时的服务,支撑电商、金融、物流、文娱、大数据、人工智能、搜索、广告等各种业务场景。 岗位要求:1. 精通C/C++语言和数据结构,算法和数据结构基础扎实 |
请发表评论