redis3.0之后提供了新的HA的解决方案,即Cluster模式,由多个节点组成的集群模式。集群master之间基于crc16算法,对key进行校验,得到的值对16384取余,就是key的hash slot(槽)值,每个节点各自存储一部分的hash槽值,主从节点之间基于异步复制方式同步数据。
基于redis集群的基本原理,gedis需要提供一下方面的能力:
1、统一的客户端Cluster;
2、集群连接池的实现;
3、集群节点的健康检查(后续实现);
4、负载均衡机制实现;
5、协议的封装保证对上层透明。
模型基本设计如下:
基础模型定义
/** * 节点 * master:主节点ip+port * slaves:从节点ip+port集合 */ type Node struct { Url string Pwd string InitActive int }
type ClusterConfig struct { Nodes []*Node HeartBeatInterval int }
/** * 集群客户端 * heartBeatInterval 心跳检测时间间隔,单位s * clusterPool key:连接串 value:连接池 */ type Cluster struct { config *ClusterConfig clusterPool map[string]*ConnPool } Cluster初始化
/** * 初始化Cluster client */ func NewCluster(clusterConfig ClusterConfig) *Cluster { nodes := clusterConfig.Nodes
var cluster Cluster clusterPool := make(map[string]*ConnPool)
for _, node := range nodes { var config = ConnConfig{node.Url, node.Pwd} pool, _ := NewConnPool(node.InitActive, config) clusterPool[node.Url] = pool } cluster.config = &clusterConfig cluster.clusterPool = clusterPool //初始化节点健康检测线程 defer func() { go cluster.heartBeat() }() if m==nil { m = new(sync.RWMutex) } return &cluster } 节点心跳检测
cluster创建后,开启异步线程定时轮询各个节点,向节点发出ping请求,若未响应pong,则表示当前节点异常,然后将当前节点退出连接池,并将该节点加入失败队列,定时轮询队列,检测是否恢复连接,若恢复,则重新创建连接池,从失败队列中退出当前节点。
/** * 连接池心跳检测,定时ping各个节点,ping失败的,从连接池退出,并将节点加入失败队列 * 定时轮询失败节点队列,检测节点是否已恢复连接,若恢复,则重新创建连接池,并从失败队列中移除 */ func (cluster *Cluster) heartBeat() { clusterPool := cluster.GetClusterPool() interval := cluster.config.HeartBeatInterval if interval <= 0 { interval = defaultHeartBeatInterval } var nodes = make(map[string]*Node)
for i := 0; i < len(cluster.GetClusterNodesInfo()); i++ { node := cluster.GetClusterNodesInfo()[i] nodes[node.Url] = node }
var failNodes = make(map[string]*Node) for { for url, pool := range clusterPool { result, err := executePing(pool) if err != nil { log.Printf("节点[%s] 健康检查异常,原因[%s], 节点将被移除\n", url, err) //加锁 m.Lock() time.Sleep(time.Duration(5)*time.Second) failNodes[url] = nodes[url] delete(clusterPool, url) m.Unlock() } else { log.Printf("节点[%s] 健康检查结果[%s]\n", url, result) } } //恢复检测 recover(failNodes, clusterPool)
time.Sleep(time.Duration(interval) * time.Second) } }
/** * 检测fail节点是否已恢复正常 */ func recover(failNodes map[string]*Node, clusterPool map[string]*ConnPool) { for url,node:=range failNodes{ conn := Connect(url) if conn != nil { //节点重连,恢复连接 var config = ConnConfig{url, node.Pwd} pool, _ := NewConnPool(node.InitActive, config) //加锁 m.Lock() clusterPool[node.Url] = pool delete(failNodes,url) m.Unlock() log.Printf("节点[%s] 已重连\n", url) } } } 测试结果:
loadbalance目前仅实现随机模式,每次访问前随机选择一个节点进行通信
func (cluster *Cluster) RandomSelect() *ConnPool { m.RLock() defer m.RUnlock() pools := cluster.GetClusterPool() for _,pool:= range pools{ if pool !=nil{ return pool } } fmt.Errorf("none pool can be used") return nil } 通信模块的大致流程如下:
1、cluster随机选择一个健康的节点,进行访问;
2、如果节点返回业务数据则通信结束;
3、如果节点返回的消息协议上满足“-MOVED”,例如 -MOVED 5678 127.0.0.1,则表明当前数据不在该节点;
4、重定向到redis指定的节点访问;
func (cluster *Cluster) Set(key string, value string) (interface{}, error) { result, err := executeSet(cluster.RandomSelect(), key, value) if err.Error() != protocol.MOVED { return result, err }
//重定向到新的节点 return executeSet(cluster.SelectOne(result.(string)), key, value) }
func executeSet(pool *ConnPool, key string, value string) (interface{}, error) { conn, err := GetConn(pool) if err != nil { return nil, fmt.Errorf("get conn fail") } defer pool.PutConn(conn) result := SendCommand(conn, protocol.SET, protocol.SafeEncode(key), protocol.SafeEncode(value)) return handler.HandleReply(result) } 这样,对于应用层来讲,无论访问的哪个节点,都能得到最终的结果,相对是透明的。
调用测试:
package main
import ( . "client" "net" "fmt" )
func main() { var node7000 = Node{"127.0.0.1:7000", "123456", 10} var node7001 = Node{"127.0.0.1:7001", "123456", 10} var node7002 = Node{"127.0.0.1:7002", "123456", 10} var node7003 = Node{"127.0.0.1:7003", "123456", 10} var node7004 = Node{"127.0.0.1:7004", "123456", 10} var node7005 = Node{"127.0.0.1:7005", "123456", 10}
nodes := []*Node{&node7000, &node7001, &node7002, &node7003, &node7004, &node7005} var clusterConfig = ClusterConfig{nodes,10} cluster := NewCluster(clusterConfig) value,err:=cluster.Get("name") fmt.Println(value, err) } 响应结果:
心跳检查和其他loadbalance机制后续补充实现。
项目地址:
https://github.com/zhangxiaomin1993/gedis
|
请发表评论