• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

兼容gorediscluster的pipeline批量

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

原文链接:兼容go redis cluster的pipeline批量

前言:

     redis cluster集群机制是不错,但因为是smart client设计,没有proxy中间层,导致很多redis批量命令在不同slot时不能适配,比如 mset、mget、pipeline等。 该篇文章讲述了redis cluster multi key批量操作的一些解决方案,尤其是golang的场景下。  

    该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5557

 

老生常谈的redis cluster概念

    Redis Cluster在设计中没有使用一致性哈希(Consistency Hashing),而是使用数据分片(Sharding)引入哈希槽(hash slot)来实现。一个 Redis Cluster包含16384(0~16383)个哈希槽,存储在Redis Cluster中的所有键都会被映射到这些slot中,集群中的每个键都属于这16384个哈希槽中的一个,集群使用公式slot=CRC16 key/16384来计算key属于哪个槽。

    集群中的每个主节点(Master)都负责处理16384个哈希槽中的一部分,当集群处于稳定状态时,每个哈希槽都只由一个主节点进行处理,每个主节点可以有一个到N个从节点。当主节点出现宕机或网络断线等不可用时,从节点能自动提升为主节点进行处理。

问题

不同slot区间的key可能在不同的节点上,如果在一个节点上执行不属于他slot区间的key会发生什么? 下面是可以会出现的error信息, redis cluster返回的错误很多,但核心异常信息不属于一个slot。

// xiaorui.cc

all keys must map to the same key slot
ERR CROSSSLOT Keys in request don't hash to the same slot

bad lua script for redis cluster, all the keys that the script uses should be passed using the KEYS array
eval/evalsha command keys must in same slot

 

redis cluster是个smart client设计模式,客户端会存有redis cluster的分片和节点关系的缓存。当你使用multi key的redis命令时 (mset、mget ),client通常会选择参数的第一个key的slot的主机进行发送。如果参数的多个key在不同的slot,在不同的主机上,那么必然会出错。 

 

 

解决方法

第一种:
最简单的方法是在所有的执行key前面加入hashtag,让他的key都在一个slot上。这样你的程序和架构简单了,带来的问题是数据倾斜,后面可能内存受限、qps受限。
第二种:

使用proxy方案做适配redis cluster集群, 像codis。滴滴、12306都在大量使用该代理。

第三种:
在客户端上实现命令的slot分组,然后分别并发处理。
 

第一种和第二种方法没什么说的,具体说下客户端怎么搞?先前有说过,客户端会缓存slot及node主机的关系,我们在客户端上根据参数keys做slot分离不就行了。像python的redis-py库包已帮你做了redis cluster的适配,对于上层代码无感知。golang其实也有这样的库,这里可以参考下 github.com/chasex/redis-go-cluster 代码, 这个是基于gomodule/redigo封装的redis集群客户端库。

 

redis-go-cluster处理入口, 劫持了MSET、MSETNX、MGET批量命令,根据slot分到不同的slice里,然后针对多个slice进行并发请求。

// xiaorui.cc

func (cluster *Cluster) Do(cmd string, args ...interface{}) (interface{}, error) {
    if len(args) < 1 {
    return nil, fmt.Errorf("Do: no key found in args")
    }

    if cmd == "MSET" || cmd == "MSETNX" {
    return cluster.multiSet(cmd, args...)
    }

    if cmd == "MGET" {
    return cluster.multiGet(cmd, args...)
    }
    ...
}


func (cluster *Cluster) multiSet(cmd string, args ...interface{}) (interface{}, error) {
    ...
    // 命令分组
    tasks := make([]*multiTask, 0)

    cluster.rwLock.RLock()
    for i := 0; i < len(args); i += 2 {
        key, err := key(args[i])
        if err != nil {
            cluster.rwLock.RUnlock()
            return nil, fmt.Errorf("multiSet: invalid key %v", args[i])
        }

        slot := hash(key)

        var j int
        for j = 0; j < len(tasks); j++ {
            // 相同的slot的key放在一个multiTask的slice里。
            if tasks[j].slot == slot {
                tasks[j].args = append(tasks[j].args, args[i])   // key
                tasks[j].args = append(tasks[j].args, args[i+1]) // value

                break
            }
        }

        if j == len(tasks) {
            node := cluster.slots[slot]
            if node == nil {
                cluster.rwLock.RUnlock()
                return nil, fmt.Errorf("multiSet: %s[%d] no node found", key, slot)
            }

            task := &multiTask{
                node: node,
                slot: slot,
                cmd:  cmd,
                args: []interface{}{args[i], args[i+1]},
                done: make(chan int),
            }
            tasks = append(tasks, task)
        }
    }
    cluster.rwLock.RUnlock()

    // 每个slot队列为为一个并发
    for i := range tasks {
        go handleSetTask(tasks[i])
    }

    // 确定都执行完
    for i := range tasks {
        <-tasks[i].done
    }

    for i := range tasks {
        _, err := String(tasks[i].reply, tasks[i].err)
        if err != nil {
            return nil, err
        }
    }

    return "OK", nil
}

type multiTask struct {
    node *redisNode
    slot uint16

    cmd  string
    args []interface{}

    reply   interface{}
    replies []interface{}
    err     error

    done chan int
}

下面是redis cluster pipeline批量实现, 跟mset mget的实现差不都,都是通过算出key的slot放在不同的slice里面,继而并发的使用pipeline。

// xiaorui.cc

// NewBatch create a new batch to pack mutiple commands.
func (cluster *Cluster) NewBatch() *Batch {
    return &Batch{
        cluster: cluster,
        batches: make([]nodeBatch, 0),
        index:   make([]int, 0),
    }
}

// Put add a redis command to batch, DO NOT put MGET/MSET/MSETNX.
func (batch *Batch) Put(cmd string, args ...interface{}) error {
    if len(args) < 1 {
        return fmt.Errorf("Put: no key found in args")
    }

    if cmd == "MGET" || cmd == "MSET" || cmd == "MSETNX" {
        return fmt.Errorf("Put: %s not supported", cmd)
    }

    node, err := batch.cluster.getNodeByKey(args[0])
    if err != nil {
        return fmt.Errorf("Put: %v", err)
    }

    var i int
    for i = 0; i < len(batch.batches); i++ {
        if batch.batches[i].node == node {
            batch.batches[i].cmds = append(batch.batches[i].cmds,
                nodeCommand{cmd: cmd, args: args})

            batch.index = append(batch.index, i)
            break
        }
    }
    ...
}

func (cluster *Cluster) RunBatch(bat *Batch) ([]interface{}, error) {
    for i := range bat.batches {
        go doBatch(&bat.batches[i])
    }

    for i := range bat.batches {
        <-bat.batches[i].done
    }

    var replies []interface{}
    for _, i := range bat.index {
        if bat.batches[i].err != nil {
            return nil, bat.batches[i].err
        }

        replies = append(replies, bat.batches[i].cmds[0].reply)
        bat.batches[i].cmds = bat.batches[i].cmds[1:]
    }

    return replies, nil
}

func doBatch(batch *nodeBatch) {
    conn, err := batch.node.getConn()
    ...

    for i := range batch.cmds {
        conn.send(batch.cmds[i].cmd, batch.cmds[i].args...)
    }

    err = conn.flush()
    ...

    for i := range batch.cmds {
        reply, err := conn.receive()
        if err != nil {
            batch.err = err
            conn.shutdown()
            batch.done <- 1
            return
        }

        batch.cmds[i].reply, batch.cmds[i].err = reply, err
    }

    ...
}

go-redis这个库也实现了批量key的匹配,下面是go-redis pipieline的源码。 https://github.com/go-redis

 

// xiaorui.cc

func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
        ...
	cmdsMap := c.mapCmdsBySlot(cmds)
	for slot, cmds := range cmdsMap {
		node, err := state.slotMasterNode(slot)
		if err != nil {
			setCmdsErr(cmds, err)
			continue
		}
		cmdsMap := map[*clusterNode][]Cmder{node: cmds}

		for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
			if attempt > 0 {
				time.Sleep(c.retryBackoff(attempt))
			}

			failedCmds := newCmdsMap()
			var wg sync.WaitGroup

			for node, cmds := range cmdsMap {
				wg.Add(1)
				go func(node *clusterNode, cmds []Cmder) {
					defer wg.Done()

					cn, err := node.Client.getConn()
					if err != nil {
						if err == pool.ErrClosed {
							c.mapCmdsByNode(cmds, failedCmds)
						} else {
							setCmdsErr(cmds, err)
						}
						return
					}

					err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
					node.Client.releaseConnStrict(cn, err)
				}(node, cmds)
			}

			wg.Wait()
			if len(failedCmds.m) == 0 {
				break
			}
			cmdsMap = failedCmds.m
		}
	}

	return cmdsFirstErr(cmds)
}

总结:

     推荐大家在使用redis cluster时也使用批量模式,这样对于减少网络IO延迟很有效果。pipeline还可以减少syscall消耗,毕竟数据合并了。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
go语言中的slice发布时间:2022-07-10
下一篇:
NilChannelsAlwaysBlock(Go语言中空管道总是阻塞)发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap