• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

OpenStack_Swift源代码分析——Ring的rebalance算法源代码具体分析 - xfgnongmin ...

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

OpenStack_Swift源代码分析——Ring的rebalance算法源代码具体分析

1 Command类中的rebalnace方法

在上篇文章中解说了,创建Ring已经为Ring加入设备。在加入设备后须要对Ring进行平衡,平衡

swift-ring-builder object.builder rebalance

首先会调用swift/cli/ringbuilder.py中方法,在main方法中首先会判读/etc/swift目录下是否有object.builder文件假设有就反序列化来初始化RingBuilder类。然后依据命令中的 第三个參数rebalance调用Commands类中的rebalance方法。此方法会对加入的设备进行平衡并为replica2part2dev(备份到分区到设备的映射)赋值。以下看代码的详细实现:

def rebalance(self):
        """
swift-ring-builder <builder_file> rebalance <seed>
    Attempts to rebalance the ring by reassigning partitions that haven\'t been
    recently reassigned.
        """
        def get_seed(index):
            try:
                return argv[index]
            except IndexError:
                pass

        devs_changed = builder.devs_changed
        try:
            last_balance = builder.get_balance()
            parts, balance = builder.rebalance(seed=get_seed(3))         #builder进行平衡
        except exceptions.RingBuilderError as e:
            print \'-\' * 79
            print("An error has occurred during ring validation. Common\n"
                  "causes of failure are rings that are empty or do not\n"
                  "have enough devices to accommodate the replica count.\n"
                  "Original exception message:\n %s" % e.message
                  )
            print \'-\' * 79
            exit(EXIT_ERROR)
        if not parts:
            print \'No partitions could be reassigned.\'
            print \'Either none need to be or none can be due to \' \
                  \'min_part_hours [%s].\' % builder.min_part_hours
            exit(EXIT_WARNING)
        # If we set device\'s weight to zero, currently balance will be set
        # special value(MAX_BALANCE) until zero weighted device return all
        # its partitions. So we cannot check balance has changed.
        # Thus we need to check balance or last_balance is special value.
        if not devs_changed and abs(last_balance - balance) < 1 and \
                not (last_balance == MAX_BALANCE and balance == MAX_BALANCE):
            print \'Cowardly  refusing to save rebalance as it did not change \' \
                  \'at least 1%.\'
            exit(EXIT_WARNING)
        try:
            builder.validate()
        except exceptions.RingValidationError as e:
            print \'-\' * 79
            print("An error has occurred during ring validation. Common\n"
                  "causes of failure are rings that are empty or do not\n"
                  "have enough devices to accommodate the replica count.\n"
                  "Original exception message:\n %s" % e.message
                  )
            print \'-\' * 79
            exit(EXIT_ERROR)
        print \'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.\' % \
              (parts, 100.0 * parts / builder.parts, balance)
        status = EXIT_SUCCESS
        if balance > 5:
            print \'-\' * 79
            print \'NOTE: Balance of %.02f indicates you should push this \' % \
                  balance
            print \'      ring, wait at least %d hours, and rebalance/repush.\' \
                  % builder.min_part_hours
            print \'-\' * 79
            status = EXIT_WARNING
        ts = time()
        builder.get_ring().save(
            pathjoin(backup_dir, \'%d.\' % ts + basename(ring_file)))
        #存入到相应的文件中面
        builder.save(pathjoin(backup_dir, \'%d.\' % ts + basename(argv[1])))
        builder.get_ring().save(ring_file)
        builder.save(argv[1])
        exit(status)
这里我们重点看rebalance函数首先看代码详细实现

def rebalance(self, seed=None):
        """
        Rebalance the ring.
        又一次平衡 ring
        This is the main work function of the builder, as it will assign and
        reassign partitions to devices in the ring based on weights, distinct
        zones, recent reassignments,(依据他的权重、不同的zone、近期的分配) etc.

        The process doesn\'t always perfectly assign partitions (that\'d take a
        lot more analysis and therefore a lot more time -- I had code that did
        that before). Because of this, it keeps rebalancing until the device
        skew 歪斜 (number of partitions a device wants compared to what it has) gets
        below 1% or doesn\'t change by more than 1% (only happens with ring that
        can\'t be balanced no matter what -- like with 3 zones of differing
        weights with replicas set to 3).
        #低于 1%时 或者变化没有多有 1% 不用再平衡会又一次平衡
        :returns: (number_of_partitions_altered, resulting_balance)
        """

        if seed is not None:
            random.seed(seed)

        self._ring = None
        if self._last_part_moves_epoch is None:
            self._initial_balance()        #第一次平衡环 须要初始化操作
            self.devs_changed = False
            return self.parts, self.get_balance()
        retval = 0
        self._update_last_part_moves()
        last_balance = 0
        new_parts, removed_part_count = self._adjust_replica2part2dev_size()
        retval += removed_part_count
        self._reassign_parts(new_parts)
        retval += len(new_parts)
        while True:
            #要不断的又一次平衡
            reassign_parts = self._gather_reassign_parts()
            self._reassign_parts(reassign_parts)
            retval += len(reassign_parts)
            while self._remove_devs:
                self.devs[self._remove_devs.pop()[\'id\']] = None
            balance = self.get_balance()
            if balance < 1 or abs(last_balance - balance) < 1 or \
                    retval == self.parts:
                break
            last_balance = balance
        self.devs_changed = False
        self.version += 1
        return retval, balance
第一次平衡须要进行初始化操作,看_initinal_balance()函数的详细实现

 def _initial_balance(self):
        """
        Initial partition assignment is the same as rebalancing an
        existing ring, but with some initial setup beforehand(须要事先设定).
        """
        self._last_part_moves = array(\'B\', (0 for _junk in xrange(self.parts)))   #记录每个分区的变动时间
        self._last_part_moves_epoch = int(time())                                 #上一次分区变动的时间偏移

        self._reassign_parts(self._adjust_replica2part2dev_size()[0])
在上面的方法中,首先是要初始化分区的变动,開始分区变动时间都为0。然后记录时间偏移。对于

 self._reassign_parts(self._adjust_replica2part2dev_size()[0])

赋值给_reassign_parts犯法的变量是_adjust_replica2part2dev_size()返回的第一个值。这个函数的主要作用就是在调整分区数量或者备份数是返回一些新增的分区,第一次初始化话每个分区的全部备份都会被返回,一遍在_reassign_parts中作分区设备映射。我们重点看_reassing_parts函数,这个函数为做备份到分区到设备的映射最重要的函数。详细看源代码。

 def _reassign_parts(self, reassign_parts):
        """
      ressign_parts
    [(0, [0, 1, 2]), (1, [0, 1, 2]), (2, [0, 1, 2]), (3, [0, 1, 2]), (4, [0, 1, 2]), (5, [0, 1, 2]), (6, [0, 1, 2]), (7, [0, 1, 2]), (8, [0, 1, 2]), (9, [0, 1, 2]), (10, [0, 1, 2]), (11, [0, 1, 2]), (12, [0, 1, 2]), (13, [0, 1, 2]), (14, [0, 1, 2]), (15, [0, 1, 2]), (16, [0, 1, 2]), (17, [0, 1, 2]), (18, [0, 1, 2]), (19, [0, 1, 2]), (20, [0, 1, 2]), (21, [0, 1, 2]), (22, [0, 1, 2]), (23, [0, 1, 2]), (24, [0, 1, 2]), (25, [0, 1, 2]), (26, [0, 1, 2]), (27, [0, 1, 2]), (28, [0, 1, 2]), (29, [0, 1, 2]), (30, [0, 1, 2]), (31, [0, 1, 2]), (32, [0, 1, 2]), (33, [0, 1, 2]), (34, [0, 1, 2]), (35, [0, 1, 2]), (36, [0, 1, 2]), (37, [0, 1, 2]), (38, [0, 1, 2]), (39, [0, 1, 2]), (40, [0, 1, 2]), (41, [0, 1, 2]), (42, [0, 1, 2]), (43, [0, 1, 2]), (44, [0, 1, 2]), (45, [0, 1, 2]), (46, [0, 1, 2]), (47, [0, 1, 2]), (48, [0, 1, 2]), (49, [0, 1, 2]), (50, [0, 1, 2]), (51, [0, 1, 2]), (52, [0, 1, 2]), (53, [0, 1, 2]), (54, [0, 1, 2]), (55, [0, 1, 2]), (56, [0, 1, 2]), (57, [0, 1, 2]), (58, [0, 1, 2]), (59, [0, 1, 2]), (60, [0, 1, 2]), (61, [0, 1, 2]), (62, [0, 1, 2]), (63, [0, 1, 2])]
    没有元素(3,[0,1,2])  3代表第三个partion [0,1,2] 代表 三个备份的分区程序运行完成后 [0,1,2] 会映射到详细的分区 若备份数为4 则 (3,[0,1,2,3])
        For an existing ring data set, partitions are reassigned similarly to
        the initial assignment. The devices are ordered by how many partitions
        they still want and kept in that order throughout the process. The
        gathered partitions are iterated through, assigning them to devices
        according to the "most wanted" while keeping the replicas as "far
        apart" as possible(尽可能的远). Two different regions are considered the
        farthest-apart things, followed by zones, then different ip/port pairs
        within a zone(zones 有不同的 ip/port 觉得是最远的); the least-far-apart things are different devices with
        the same ip/port pair in the same zone.

        If you want more replicas than devices, you won\'t get all your
        replicas.

        :param reassign_parts: An iterable of (part, replicas_to_replace)
                               pairs. replicas_to_replace is an iterable of the
                               replica (an int) to replace for that partition.
                               replicas_to_replace may be shared for multiple
                               partitions, so be sure you do not modify it.
        """
        for dev in self._iter_devs():
            #(dev[\'parts_wanted\'], random.randint(0, 0xFFFF), dev[\'id\'])
            # (part_want,int,id)
            dev[\'sort_key\'] = self._sort_key_for(dev)
            dev[\'tiers\'] = tiers_for_dev(dev)
            """
            print dev[\'sort_key\']
            print dev[\'tiers\']
            (126, 25509, 0)
            ((0,), (0, 0), (0, 0, \'127.0.0.1:6000\'), (0, 0, \'127.0.0.1:6000\', 0))
            (126, 55891, 1)
            ((1,), (1, 1), (1, 1, \'127.0.0.2:6001\'), (1, 1, \'127.0.0.2:6001\', 1))
            (3, 24684, 2)
            ((2,), (2, 3), (2, 3, \'127.0.0.1:6000\'), (2, 3, \'127.0.0.1:6000\', 2))
            (126。25509。0)(dev[\'parts_wanted\'], random.randint(0, 0xFFFF), dev[\'id\'])
            """
            #sorted 函数会按d[\'sort_key\']中(a,b,c)a 升序排,a同样 则按b,b同样则按c 以此类推  (part_want,int,id)
        available_devs = \
            sorted((d for d in self._iter_devs() if d[\'weight\']),
                   key=lambda x: x[\'sort_key\'])


        """
        [{\'tiers\': ((2,), (2, 3), (2, 3, \'127.0.0.1:6000\'), (2, 3, \'127.0.0.1:6000\', 2)), \'replication_port\': 6000, \'zone\': 3, \'weight\': 3.14159265359, \'sort_key\': (3, 57095, 2), \'ip\': \'127.0.0.1\', \'region\': 2, \'parts\': 0, \'id\': 2, \'replication_ip\': \'127.0.0.1\', \'meta\': \'some meta data\', \'device\': \'sda3\', \'parts_wanted\': 3, \'port\': 6000},
        {\'tiers\': ((0,), (0, 0), (0, 0, \'127.0.0.1:6000\'), (0, 0, \'127.0.0.1:6000\', 0)), \'replication_port\': 6000, \'weight\': 100.0, \'zone\': 0, \'sort_key\': (126, 14320, 0), \'ip\': \'127.0.0.1\', \'region\': 0, \'id\': 0, \'replication_ip\': \'127.0.0.1\', \'parts\': 0, \'meta\': \'some meta data\', \'device\': \'sda1\', \'parts_wanted\': 126, \'port\': 6000},
        {\'tiers\': ((1,), (1, 1), (1, 1, \'127.0.0.2:6001\'), (1, 1, \'127.0.0.2:6001\', 1)), \'replication_port\': 6001, \'weight\': 100.0, \'zone\': 1, \'sort_key\': (126, 26347, 1), \'ip\': \'127.0.0.2\', \'region\': 1, \'id\': 1, \'replication_ip\': \'127.0.0.2\', \'parts\': 0, \'meta\': \'\', \'device\': \'sda2\', \'parts_wanted\': 126, \'port\': 6001}]

        """
        #java 新增此功能
        #(0,1):list
        tier2devs = defaultdict(list)
        tier2sort_key = defaultdict(tuple)
        tier2dev_sort_key = defaultdict(list)
        max_tier_depth = 0
        #available_devs 已经排序
        for dev in available_devs:
            for tier in dev[\'tiers\']:
                #对一个tier2devs[tier] 同一个key 有多个dev 且 dev是按sort_key 升序排列的
                #(1,2)
                tier2devs[tier].append(dev)  # <-- starts out sorted!
                #tier2dev_sort_key 同一个key 相应多个 dev[sort_key]
                tier2dev_sort_key[tier].append(dev[\'sort_key\'])
                #对于tier2sort_key 中在迭代时 对于tier同样,最后会是dev[sort_key]最后的一个(也就是最大的一个 一般为 dev[sort_key]中dev[part_want]最大的,相应part_want同样的则会依据random.randint(0, 0xFFFF)排序。由于random.randint(0, 0xFFFF)是个随机值,也就是说会选择random.randint(0, 0xFFFF)产生的值最大的,对于partion选择设备时,对于part_want同样的,random.randint(0, 0xFFFF)大的会被优先被选上,这样也保证了在同样 part_want值的设备中,partion随机选择一个设备)
                tier2sort_key[tier] = dev[\'sort_key\']
                #求最长的tier  (1, 1, \'127.0.0.2:6001\', 1) 最长
                if len(tier) > max_tier_depth:
                    max_tier_depth = len(tier)
        #没一个tier 相应的子tier 如 () 相应的子tier为(1)。(2)。(3)
        tier2children_sets = build_tier_tree(available_devs)
        tier2children = defaultdict(list)
        tier2children_sort_key = {}
        tiers_list = [()]
        depth = 1
        while depth <= max_tier_depth:
            new_tiers_list = []
            #開始 tier 为()
            for tier in tiers_list:
                child_tiers = list(tier2children_sets[tier])
                #比方() 相应的child_tiers 为[(1,),(2,),(3)] 则会按每个孩子的sort_key 排序比如
                #tier2sort_key[\'(1,)\'] = (1,2,3)
                #tier2sort_key[\'(2,)\'] = (3,1,3)
                #tier2sort_key[\'(3,)\'] = (2,1,3)
                #排序后为 [(1,),(3,),(2)]
                #(1,)中的sort_key 为其全部child_tiers中最大的一个,(2,),(3,)也是如此
                child_tiers.sort(key=tier2sort_key.__getitem__)
                #tier2children 每个key相应value中的值是按 sort_key 排序的
                tier2children[tier] = child_tiers

                tier2children_sort_key[tier] = map(
                    tier2sort_key.__getitem__, child_tiers)
                new_tiers_list.extend(child_tiers)
            #tiers_list 最后中的元素为(2, 3, \'127.0.0.1:6000\', 2),即每一设备的(region,zone,ip:port,id)
            tiers_list = new_tiers_list
            depth += 1
        #(1, [0, 1, 2])  2**power
        for part, replace_replicas in reassign_parts:
            # Gather up(收集起来) what other tiers (regions, zones, ip/ports, and
            # devices) the replicas not-to-be-moved are in for this part.
            other_replicas = defaultdict(int)
            unique_tiers_by_tier_len = defaultdict(set)
            #_replicas_for_part  每个part相应的 replica 为一个list
            for replica in self._replicas_for_part(part):
                #对于当前的part,假设该备份没有被收集
                if replica not in replace_replicas:
                    dev = self.devs[self._replica2part2dev[replica][part]]
                    for tier in dev[\'tiers\']:
                        other_replicas[tier] += 1
                        unique_tiers_by_tier_len[len(tier)].add(tier)

            for replica in replace_replicas:
                tier = ()
                depth = 1
                #此循环为tiers 排序 依据 有多少当前分区的备份数
                while depth <= max_tier_depth:
                    # Order the tiers by how many replicas of this
                    # partition they already have(对tiers进行排序是依据他们分配了多少分区).
                    # Then, of the ones
                    # with the smallest number of replicas, pick the
                    # tier with the hungriest drive and then continue
                    # searching in that subtree.(因此有最少备份的分区,将会选择最饥饿的设备)
                    #
                    # There are other strategies we could use here,
                    # such as hungriest-tier(最饥渴的) (i.e. biggest
                    # sum-of-parts-wanted) or picking one at random.
                    # However, hungriest-drive is what was used here
                    # before, and it worked pretty well in practice.
                    #
                    # Note that this allocator will balance things as
                    # evenly as possible at each level of the device
                    # layout(在设备的每一层尽可能的平衡).
                    # If your layout is extremely unbalanced,
                    # this may produce poor results.
                    #
                    # This used to be a cute(聪明), recursive(递归) function, but it\'s been
                    # unrolled(扩展) for performance.

                    # We sort the tiers here so that, when we look for a tier
                    # with the lowest number of replicas, the first one we
                    # find is the one with the hungriest drive (i.e. drive
                    # with the largest sort_key value). This lets us
                    # short-circuit(环) the search while still ensuring we get the
                    # right tier
                    #candidate 候选
                    candidates_with_replicas = \
                        unique_tiers_by_tier_len[len(tier) + 1]
                    # Find a tier with the minimal replica count and the
                    # hungriest drive among all the tiers with the minimal
                    # replica count(找到replica最少的tier 并在这些replica最少的tier中找到最饥饿的drive).
                    if len(tier2children[tier]) > \
                            len(candidates_with_replicas):
                        # There exists at least one tier with 0 other replicas
                        tier = max((t for t in tier2children[tier]
                                    if other_replicas[t] == 0),
                                   key=tier2sort_key.__getitem__)
                    else:
                        tier = max(tier2children[tier],
                                   key=lambda t: (-other_replicas[t],
                                                  tier2sort_key[t]))
                    depth += 1
                #上边几步会选择出来一个 tier, 有待认真理解



                #tier2devs[tier]相应的值是按 sort_key升序排列的 所以此时的dev肯定是sort_key 最大的dev
                dev = tier2devs[tier][-1]
                dev[\'parts_wanted\'] -= 1
                dev[\'parts\'] += 1
                old_sort_key = dev[\'sort_key\']
                #更新 part_wanted 和parts后会又一次生成一个new_sort_key
                new_sort_key = dev[\'sort_key\'] = self._sort_key_for(dev)
                #由于dev的part_wanted 和 parts 变了 所以要做一些又一次插入操作
                for tier in dev[\'tiers\']:
                    other_replicas[tier] += 1
                    unique_tiers_by_tier_len[len(tier)].add(tier)
                    #返回 old_sort_key 在tier2dev_sort_key[tier]中插入的位置 不会插入 。主要是找到旧的sort_key 在 tier2dev_sort_key中的位置
                    index = bisect.bisect_left(tier2dev_sort_key[tier],
                                               old_sort_key)
                    #下边两行是把tier2devs和tier2dev_sort_key 之前旧的sort_key 相应的dev删去
                    tier2devs[tier].pop(index)
                    tier2dev_sort_key[tier].pop(index)
                    #依据新的的sort_key 找到dev应该插入的新位置 并插入的tier2devs和tier2dev_sort_key新的位置上去
                    new_index = bisect.bisect_left(tier2dev_sort_key[tier],
                                                   new_sort_key)
                    tier2devs[tier].insert(new_index, dev)
                    tier2dev_sort_key[tier].insert(new_index, new_sort_key)

                    #更新tier2sort_key 是teir2sort_key 为相应tier中sort_key为最大的
                    new_last_sort_key = tier2dev_sort_key[tier][-1]
                    tier2sort_key[tier] = new_last_sort_key

                    # Now jiggle(轻摇 摇动) tier2children values to keep them sorted
                    #找到上一级tier 如(1,2)相应的上一级为(1,)
                    parent_tier = tier[0:-1]
                    #获得上一级tier在其上一级上一级的tier中相应的位置
                    index = bisect.bisect_left(
                        tier2children_sort_key[parent_tier],
                        old_sort_key)
                    #pop出来这个tier并得到这个tier
                    popped = tier2children[parent_tier].pop(index)
                    #从tier2children_sort_key中pop出旧的sort_key
                    tier2children_sort_key[parent_tier].pop(index)

                    #依据新的最大的sort_key找到当前tier在其上一级tier中应该插入的位置
                    new_index = bisect.bisect_left(
                        tier2children_sort_key[parent_tier],
                        new_last_sort_key)
                    #在tier2children新位置中插入上一步pop出来的tier
                    tier2children[parent_tier].insert(new_index, popped)
                    #更新本一级在上一级的tier2children中的插入位置
                    tier2children_sort_key[parent_tier].insert(
                        new_index, new_last_sort_key)

                self._replica2part2dev[replica][part] = dev[\'id\']
这个函数就是_replica2part2dev进行赋值,在这个赋值过程中,须要考虑允许分区相应的副本不能在一个zone里面,并且在选择详细设备时首先要选择part_wants值最大的,过程比較复杂,须要好好理解。

在rebalance方法运行完成后。RingBuilder实例的各个属性就被附上了值,我们回到Command类的rebalance,在经过一些try....except后。会将数据再次序列化,此时序列化,须要将数据序列化到两个文件中,第一个是create和add方法序列化存入的object.builder文件。这里面存入的数据为创建Ring已经维护Ring的详细数据,还有一个文件为object.ring.gz这里面存入的数据为系统使用Ring时的数据。详细看代码片段:

  builder.get_ring().save(
            pathjoin(backup_dir, \'%d.\' % ts + basename(ring_file)))
        #存入到相应的文件中面
        builder.save(pathjoin(backup_dir, \'%d.\' % ts + basename(argv[1])))
        builder.get_ring().save(ring_file)
当中builer调用get_ring方法获得一个RingData实例。让后将RingData实例序列化(将数据存入到object.ring.gz中),我们看看RingData实例__init__函数,看看它存到磁盘上的究竟为什么数据。

def __init__(self, replica2part2dev_id, devs, part_shift):
        self.devs = devs
        self._replica2part2dev_id = replica2part2dev_id
        self._part_shift = part_shift
        #dev 格式为dev0 = {\'region\': 1, \'zone\': 1, \'ip\': \'192.168.1.1\',
        #            \'port\': \'6000\', \'id\': 0}
        for dev in self.devs:
            if dev is not None:
                dev.setdefault("region", 1)
从上面函数能够看出,其序列化的事实上就是replica2part2dev_id(备份到分区到设备的映射), devs(全部的设备), part_shift(右移的位数)。

至此rebalance解说完成。以下看外部函数怎样使用Ring中的数据。

2 Ring数据的使用

第一节讲到rebalance后。对外提供的数据会此存在object.ring.gz文件中,数据是怎样调用的那,我们再看swift/commom/ring/ring.py中的Ring类。此类提供数据调用的接口。详细看代码:

class Ring(object):
    """
    Partitioned consistent hashing ring(分区一致性hash).

    :param serialized_path: path to serialized RingData instance
    :param reload_time: time interval(间隔) in seconds to check for a ring change
    """

    def __init__(self, serialized_path, reload_time=15, ring_name=None):
        # can\'t use the ring unless HASH_PATH_SUFFIX(后缀 下标) is set
        validate_configuration()
        if ring_name:
            self.serialized_path = os.path.join(serialized_path,
                                                ring_name + \'.ring.gz\')
        else:
            self.serialized_path = os.path.join(serialized_path)
        self.reload_time = reload_time
        self._reload(force=True)

    def _reload(self, force=False):
        self._rtime = time() + self.reload_time
        if force or self.has_changed():
            ring_data = RingData.load(self.serialized_path)
            self._mtime = getmtime(self.serialized_path)
            self._devs = ring_data.devs
            # NOTE(akscram): Replication parameters like replication_ip
            #                and replication_port are required for
            #                replication process. An old replication
            #                ring doesn\'t contain this parameters into
            #                device. Old-style pickled rings won\'t have
            #                region information.
            for dev in self._devs:
                if dev:
                    dev.setdefault(\'region\', 1)
                    if \'ip\' in dev:
                        dev.setdefault(\'replication_ip\', dev[\'ip\'])
                    if \'port\' in dev:
                        dev.setdefault(\'replication_port\', dev[\'port\'])

            self._replica2part2dev_id = ring_data._replica2part2dev_id
            self._part_shift = ring_data._part_shift
            self._rebuild_tier_data()

            # Do this now, when we know the data has changed, rather then
            # doing it on every call to get_more_nodes().
            regions = set()
            zones = set()
            ip_ports = set()
            self._num_devs = 0
            for dev in self._devs:
                if dev:
                    regions.add(dev[\'region\'])
                    zones.add((dev[\'region\'], dev[\'zone\']))
                    ip_ports.add((dev[\'region\'], dev[\'zone\'],
                                  dev[\'ip\'], dev[\'port\']))
                    self._num_devs += 1
            self._num_regions = len(regions)
            self._num_zones = len(zones)
            self._num_ip_ports = len(ip_ports)

    def _rebuild_tier_data(self):
        self.tier2devs = defaultdict(list)
        for dev in self._devs:
            if not dev:
                continue
            for tier in tiers_for_dev(dev):
                self.tier2devs[tier].append(dev)

        tiers_by_length = defaultdict(list)
        for tier in self.tier2devs:
            tiers_by_length[len(tier)].append(tier)
        self.tiers_by_length = sorted(tiers_by_length.values(),
                                      key=lambda x: len(x[0]))
        for tiers in self.tiers_by_length:
            tiers.sort()

    @property
    def replica_count(self):
        """Number of replicas (full or partial) used in the ring."""
        return len(self._replica2part2dev_id)

    @property
    def partition_count(self):
        """Number of partitions in the ring."""
        return len(self._replica2part2dev_id[0])

    @property
    def devs(self):
        """devices in the ring"""
        #15秒 扫描一次
        if time() > self._rtime:
            self._reload()
        return self._devs

    def has_changed(self):
        """
        Check to see if the ring on disk is different than the current one in
        memory.

        :returns: True if the ring on disk has changed, False otherwise
        """
        return getmtime(self.serialized_path) != self._mtime

    def _get_part_nodes(self, part):
        part_nodes = []
        seen_ids = set()
        for r2p2d in self._replica2part2dev_id:
            if part < len(r2p2d):
                dev_id = r2p2d[part]
                if dev_id not in seen_ids:
                    part_nodes.append(self.devs[dev_id])
                seen_ids.add(dev_id)
        return part_nodes

#   common/db_replicator.py _replicate_object 用到
    def get_part(self, account, container=None, obj=None):
        """
        Get the partition for an account/container/object.

        :param account: account name
        :param container: container name
        :param obj: object name
        :returns: the partition number
        """
        key = hash_path(account, container, obj, raw_digest=True)
        if time() > self._rtime:
            self._reload()
        part = struct.unpack_from(\'>I\', key)[0] >> self._part_shift
        return part

    def get_part_nodes(self, part):
        """
        Get the nodes that are responsible for the partition. If one
        node is responsible for more than one replica of the same
        partition, it will only appear in the output once.

        :param part: partition to get nodes for
        :returns: list of node dicts

        See :func:`get_nodes` for a description of the node dicts.
        """

        if time() > self._rtime:
            self._reload()
        return self._get_part_nodes(part)

    def get_nodes(self, account, container=None, obj=None):
        """
        Get the partition and nodes for an account/container/object.
        If a node is responsible for more than one replica, it will
        only appear in the output once.

        :param account: account name
        :param container: container name
        :param obj: object name
        :returns: a tuple of (partition, list of node dicts)

        Each node dict will have at least the following keys:

        ======  ===============================================================
        id      unique integer identifier amongst devices
        weight  a float of the relative weight of this device as compared to
                others; this indicates how many partitions the builder will try
                to assign to this device
        zone    integer indicating which zone the device is in; a given
                partition will not be assigned to multiple devices within the
                same zone
        ip      the ip address of the device
        port    the tcp port of the device
        device  the device\'s name on disk (sdb1, for example)
        meta    general use \'extra\' field; for example: the online date, the
                hardware description
        ======  ===============================================================
        """
        part = self.get_part(account, container, obj)
        return part, self._get_part_nodes(part)


#当一个节点失败时。replicator会利用此方法,来获一个替代它的节点
    def get_more_nodes(self, part):
        """
        Generator to get extra nodes for a partition for hinted handoff(由于为守护进程在出来 故为私下转移).
        #handoff nodes 尝试处于primary primary_nodes 之外的 zones中
        The handoff nodes will try to be in zones other than the
        primary zones, will take into account the device weights, and
        will usually keep the same sequences of handoffs even with
        ring changes.要是handoffs 处于同样的序列中即使ring 改变了

        :param part: partition to get handoff nodes for
        :returns: generator of node dicts

        See :func:`get_nodes` for a description of the node dicts.
        """
        if time() > self._rtime:
            self._reload()
        primary_nodes = self._get_part_nodes(part)

        used = set(d[\'id\'] for d in primary_nodes)
        same_regions = set(d[\'region\'] for d in primary_nodes)
        same_zones = set((d[\'region\'], d[\'zone\']) for d in primary_nodes)
        same_ip_ports = set((d[\'region\'], d[\'zone\'], d[\'ip\'], d[\'port\'])
                            for d in primary_nodes)
        #parts=2**power
        parts = len(self._replica2part2dev_id[0])
        # unpack_from(fmt, buffer, offset=0): # known case of _struct.unpack_from
        #Unpack the buffer, containing packed C structure data, according to
        #fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt).
        #>I I代表整数, >I 表示大端存储
        start = struct.unpack_from(
            \'>I\', md5(str(part)).digest())[0] >> self._part_shift
        #步长
        inc = int(parts / 65536) or 1
        # Multiple loops for execution speed; the checks and bookkeeping get
        # simpler as you go along
        #hit 命中
        hit_all_regions = len(same_regions) == self._num_regions
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_regions:
                # At this point, there are no regions left untouched, so we
                # can stop looking.
                break
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    dev = self._devs[dev_id]
                    region = dev[\'region\']
                    if dev_id not in used and region not in same_regions:
                        yield dev
                        used.add(dev_id)
                        same_regions.add(region)
                        zone = dev[\'zone\']
                        ip_port = (region, zone, dev[\'ip\'], dev[\'port\'])
                        same_zones.add((region, zone))
                        same_ip_ports.add(ip_port)
                        if len(same_regions) == self._num_regions:
                            hit_all_regions = True
                            break

        hit_all_zones = len(same_zones) == self._num_zones
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_zones:
                # Much like we stopped looking for fresh regions before, we
                # can now stop looking for fresh zones; there are no more.
                break
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    dev = self._devs[dev_id]
                    zone = (dev[\'region\'], dev[\'zone\'])
                    if dev_id not in used and zone not in same_zones:
                        yield dev
                        used.add(dev_id)
                        same_zones.add(zone)
                        ip_port = zone + (dev[\'ip\'], dev[\'port\'])
                        same_ip_ports.add(ip_port)
                        if len(same_zones) == self._num_zones:
                            hit_all_zones = True
                            break

        hit_all_ip_ports = len(same_ip_ports) == self._num_ip_ports
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_ip_ports:
                # We\'ve exhausted(耗尽) the pool of unused backends, so stop
                # looking.
                break
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    dev = self._devs[dev_id]
                    ip_port = (dev[\'region\'], dev[\'zone\'],
                               dev[\'ip\'], dev[\'port\'])
                    if dev_id not in used and ip_port not in same_ip_ports:
                        yield dev
                        used.add(dev_id)
                        same_ip_ports.add(ip_port)
                        if len(same_ip_ports) == self._num_ip_ports:
                            hit_all_ip_ports = True
                            break

        hit_all_devs = len(used) == self._num_devs
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_devs:
                # We\'ve used every device we have, so let\'s stop looking for
                # unused devices now.
                break
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    if dev_id not in used:
                        yield self._devs[dev_id]
                        used.add(dev_id)
                        if len(used) == self._num_devs:
                            hit_all_devs = True
                            break
当中:

get_part(self, account, container=None, obj=None)

def get_part_nodes(self, part):

def get_nodes(self, account, container=None, obj=None):

def get_more_nodes(self, part):


通过反序列化哈.ring.gz中的数据。然后利用上面的几个函数接口利用反序列化回来的数据。以下我们在回过头看看OpenStack_Swift源代码分析——Ring基本原理及一致性Hash算法中的最后一张图

                                                     

                                                                                                                       图1 环的运行机制

利用上图指示的关系。假如我们要存入一个对象,首先通过对象的account/container/object的md5值的前四个字节(32位)右移动part_shift位,得到详细对象相应的分区号,然后通过replica2part2dev_id(备份到分区到设备的映射), 找到当前分区相应的三个备份设备的Id。获得id后从devs里面得到详细的dev,因dev里面存有设备的ip,port,以及存储数据的磁盘,分别创建请求这三个设备,将数据存入设备上。

至此本节解说完成,在兴许章节中我将会讲述加入删除设备后,怎样维护环中的映射关系。

由于本人水平有限。文中难免出现理解错误。敬请指正、交流,谢谢!


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap