OpenStack_Swift源代码分析——Ring的rebalance算法源代码具体分析
1 Command类中的rebalnace方法
在上篇文章中解说了,创建Ring已经为Ring加入设备。在加入设备后须要对Ring进行平衡,平衡
swift-ring-builder object.builder rebalance
首先会调用swift/cli/ringbuilder.py中方法,在main方法中首先会判读/etc/swift目录下是否有object.builder文件假设有就反序列化来初始化RingBuilder类。然后依据命令中的 第三个參数rebalance调用Commands类中的rebalance方法。此方法会对加入的设备进行平衡并为replica2part2dev(备份到分区到设备的映射)赋值。以下看代码的详细实现:
def rebalance(self):
"""
swift-ring-builder <builder_file> rebalance <seed>
Attempts to rebalance the ring by reassigning partitions that haven\'t been
recently reassigned.
"""
def get_seed(index):
try:
return argv[index]
except IndexError:
pass
devs_changed = builder.devs_changed
try:
last_balance = builder.get_balance()
parts, balance = builder.rebalance(seed=get_seed(3)) #builder进行平衡
except exceptions.RingBuilderError as e:
print \'-\' * 79
print("An error has occurred during ring validation. Common\n"
"causes of failure are rings that are empty or do not\n"
"have enough devices to accommodate the replica count.\n"
"Original exception message:\n %s" % e.message
)
print \'-\' * 79
exit(EXIT_ERROR)
if not parts:
print \'No partitions could be reassigned.\'
print \'Either none need to be or none can be due to \' \
\'min_part_hours [%s].\' % builder.min_part_hours
exit(EXIT_WARNING)
# If we set device\'s weight to zero, currently balance will be set
# special value(MAX_BALANCE) until zero weighted device return all
# its partitions. So we cannot check balance has changed.
# Thus we need to check balance or last_balance is special value.
if not devs_changed and abs(last_balance - balance) < 1 and \
not (last_balance == MAX_BALANCE and balance == MAX_BALANCE):
print \'Cowardly refusing to save rebalance as it did not change \' \
\'at least 1%.\'
exit(EXIT_WARNING)
try:
builder.validate()
except exceptions.RingValidationError as e:
print \'-\' * 79
print("An error has occurred during ring validation. Common\n"
"causes of failure are rings that are empty or do not\n"
"have enough devices to accommodate the replica count.\n"
"Original exception message:\n %s" % e.message
)
print \'-\' * 79
exit(EXIT_ERROR)
print \'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.\' % \
(parts, 100.0 * parts / builder.parts, balance)
status = EXIT_SUCCESS
if balance > 5:
print \'-\' * 79
print \'NOTE: Balance of %.02f indicates you should push this \' % \
balance
print \' ring, wait at least %d hours, and rebalance/repush.\' \
% builder.min_part_hours
print \'-\' * 79
status = EXIT_WARNING
ts = time()
builder.get_ring().save(
pathjoin(backup_dir, \'%d.\' % ts + basename(ring_file)))
#存入到相应的文件中面
builder.save(pathjoin(backup_dir, \'%d.\' % ts + basename(argv[1])))
builder.get_ring().save(ring_file)
builder.save(argv[1])
exit(status)
这里我们重点看rebalance函数首先看代码详细实现
def rebalance(self, seed=None):
"""
Rebalance the ring.
又一次平衡 ring
This is the main work function of the builder, as it will assign and
reassign partitions to devices in the ring based on weights, distinct
zones, recent reassignments,(依据他的权重、不同的zone、近期的分配) etc.
The process doesn\'t always perfectly assign partitions (that\'d take a
lot more analysis and therefore a lot more time -- I had code that did
that before). Because of this, it keeps rebalancing until the device
skew 歪斜 (number of partitions a device wants compared to what it has) gets
below 1% or doesn\'t change by more than 1% (only happens with ring that
can\'t be balanced no matter what -- like with 3 zones of differing
weights with replicas set to 3).
#低于 1%时 或者变化没有多有 1% 不用再平衡会又一次平衡
:returns: (number_of_partitions_altered, resulting_balance)
"""
if seed is not None:
random.seed(seed)
self._ring = None
if self._last_part_moves_epoch is None:
self._initial_balance() #第一次平衡环 须要初始化操作
self.devs_changed = False
return self.parts, self.get_balance()
retval = 0
self._update_last_part_moves()
last_balance = 0
new_parts, removed_part_count = self._adjust_replica2part2dev_size()
retval += removed_part_count
self._reassign_parts(new_parts)
retval += len(new_parts)
while True:
#要不断的又一次平衡
reassign_parts = self._gather_reassign_parts()
self._reassign_parts(reassign_parts)
retval += len(reassign_parts)
while self._remove_devs:
self.devs[self._remove_devs.pop()[\'id\']] = None
balance = self.get_balance()
if balance < 1 or abs(last_balance - balance) < 1 or \
retval == self.parts:
break
last_balance = balance
self.devs_changed = False
self.version += 1
return retval, balance
第一次平衡须要进行初始化操作,看_initinal_balance()函数的详细实现
def _initial_balance(self):
"""
Initial partition assignment is the same as rebalancing an
existing ring, but with some initial setup beforehand(须要事先设定).
"""
self._last_part_moves = array(\'B\', (0 for _junk in xrange(self.parts))) #记录每个分区的变动时间
self._last_part_moves_epoch = int(time()) #上一次分区变动的时间偏移
self._reassign_parts(self._adjust_replica2part2dev_size()[0])
在上面的方法中,首先是要初始化分区的变动,開始分区变动时间都为0。然后记录时间偏移。对于
self._reassign_parts(self._adjust_replica2part2dev_size()[0])
赋值给_reassign_parts犯法的变量是_adjust_replica2part2dev_size()返回的第一个值。这个函数的主要作用就是在调整分区数量或者备份数是返回一些新增的分区,第一次初始化话每个分区的全部备份都会被返回,一遍在_reassign_parts中作分区设备映射。我们重点看_reassing_parts函数,这个函数为做备份到分区到设备的映射最重要的函数。详细看源代码。
def _reassign_parts(self, reassign_parts):
"""
ressign_parts
[(0, [0, 1, 2]), (1, [0, 1, 2]), (2, [0, 1, 2]), (3, [0, 1, 2]), (4, [0, 1, 2]), (5, [0, 1, 2]), (6, [0, 1, 2]), (7, [0, 1, 2]), (8, [0, 1, 2]), (9, [0, 1, 2]), (10, [0, 1, 2]), (11, [0, 1, 2]), (12, [0, 1, 2]), (13, [0, 1, 2]), (14, [0, 1, 2]), (15, [0, 1, 2]), (16, [0, 1, 2]), (17, [0, 1, 2]), (18, [0, 1, 2]), (19, [0, 1, 2]), (20, [0, 1, 2]), (21, [0, 1, 2]), (22, [0, 1, 2]), (23, [0, 1, 2]), (24, [0, 1, 2]), (25, [0, 1, 2]), (26, [0, 1, 2]), (27, [0, 1, 2]), (28, [0, 1, 2]), (29, [0, 1, 2]), (30, [0, 1, 2]), (31, [0, 1, 2]), (32, [0, 1, 2]), (33, [0, 1, 2]), (34, [0, 1, 2]), (35, [0, 1, 2]), (36, [0, 1, 2]), (37, [0, 1, 2]), (38, [0, 1, 2]), (39, [0, 1, 2]), (40, [0, 1, 2]), (41, [0, 1, 2]), (42, [0, 1, 2]), (43, [0, 1, 2]), (44, [0, 1, 2]), (45, [0, 1, 2]), (46, [0, 1, 2]), (47, [0, 1, 2]), (48, [0, 1, 2]), (49, [0, 1, 2]), (50, [0, 1, 2]), (51, [0, 1, 2]), (52, [0, 1, 2]), (53, [0, 1, 2]), (54, [0, 1, 2]), (55, [0, 1, 2]), (56, [0, 1, 2]), (57, [0, 1, 2]), (58, [0, 1, 2]), (59, [0, 1, 2]), (60, [0, 1, 2]), (61, [0, 1, 2]), (62, [0, 1, 2]), (63, [0, 1, 2])]
没有元素(3,[0,1,2]) 3代表第三个partion [0,1,2] 代表 三个备份的分区程序运行完成后 [0,1,2] 会映射到详细的分区 若备份数为4 则 (3,[0,1,2,3])
For an existing ring data set, partitions are reassigned similarly to
the initial assignment. The devices are ordered by how many partitions
they still want and kept in that order throughout the process. The
gathered partitions are iterated through, assigning them to devices
according to the "most wanted" while keeping the replicas as "far
apart" as possible(尽可能的远). Two different regions are considered the
farthest-apart things, followed by zones, then different ip/port pairs
within a zone(zones 有不同的 ip/port 觉得是最远的); the least-far-apart things are different devices with
the same ip/port pair in the same zone.
If you want more replicas than devices, you won\'t get all your
replicas.
:param reassign_parts: An iterable of (part, replicas_to_replace)
pairs. replicas_to_replace is an iterable of the
replica (an int) to replace for that partition.
replicas_to_replace may be shared for multiple
partitions, so be sure you do not modify it.
"""
for dev in self._iter_devs():
#(dev[\'parts_wanted\'], random.randint(0, 0xFFFF), dev[\'id\'])
# (part_want,int,id)
dev[\'sort_key\'] = self._sort_key_for(dev)
dev[\'tiers\'] = tiers_for_dev(dev)
"""
print dev[\'sort_key\']
print dev[\'tiers\']
(126, 25509, 0)
((0,), (0, 0), (0, 0, \'127.0.0.1:6000\'), (0, 0, \'127.0.0.1:6000\', 0))
(126, 55891, 1)
((1,), (1, 1), (1, 1, \'127.0.0.2:6001\'), (1, 1, \'127.0.0.2:6001\', 1))
(3, 24684, 2)
((2,), (2, 3), (2, 3, \'127.0.0.1:6000\'), (2, 3, \'127.0.0.1:6000\', 2))
(126。25509。0)(dev[\'parts_wanted\'], random.randint(0, 0xFFFF), dev[\'id\'])
"""
#sorted 函数会按d[\'sort_key\']中(a,b,c)a 升序排,a同样 则按b,b同样则按c 以此类推 (part_want,int,id)
available_devs = \
sorted((d for d in self._iter_devs() if d[\'weight\']),
key=lambda x: x[\'sort_key\'])
"""
[{\'tiers\': ((2,), (2, 3), (2, 3, \'127.0.0.1:6000\'), (2, 3, \'127.0.0.1:6000\', 2)), \'replication_port\': 6000, \'zone\': 3, \'weight\': 3.14159265359, \'sort_key\': (3, 57095, 2), \'ip\': \'127.0.0.1\', \'region\': 2, \'parts\': 0, \'id\': 2, \'replication_ip\': \'127.0.0.1\', \'meta\': \'some meta data\', \'device\': \'sda3\', \'parts_wanted\': 3, \'port\': 6000},
{\'tiers\': ((0,), (0, 0), (0, 0, \'127.0.0.1:6000\'), (0, 0, \'127.0.0.1:6000\', 0)), \'replication_port\': 6000, \'weight\': 100.0, \'zone\': 0, \'sort_key\': (126, 14320, 0), \'ip\': \'127.0.0.1\', \'region\': 0, \'id\': 0, \'replication_ip\': \'127.0.0.1\', \'parts\': 0, \'meta\': \'some meta data\', \'device\': \'sda1\', \'parts_wanted\': 126, \'port\': 6000},
{\'tiers\': ((1,), (1, 1), (1, 1, \'127.0.0.2:6001\'), (1, 1, \'127.0.0.2:6001\', 1)), \'replication_port\': 6001, \'weight\': 100.0, \'zone\': 1, \'sort_key\': (126, 26347, 1), \'ip\': \'127.0.0.2\', \'region\': 1, \'id\': 1, \'replication_ip\': \'127.0.0.2\', \'parts\': 0, \'meta\': \'\', \'device\': \'sda2\', \'parts_wanted\': 126, \'port\': 6001}]
"""
#java 新增此功能
#(0,1):list
tier2devs = defaultdict(list)
tier2sort_key = defaultdict(tuple)
tier2dev_sort_key = defaultdict(list)
max_tier_depth = 0
#available_devs 已经排序
for dev in available_devs:
for tier in dev[\'tiers\']:
#对一个tier2devs[tier] 同一个key 有多个dev 且 dev是按sort_key 升序排列的
#(1,2)
tier2devs[tier].append(dev) # <-- starts out sorted!
#tier2dev_sort_key 同一个key 相应多个 dev[sort_key]
tier2dev_sort_key[tier].append(dev[\'sort_key\'])
#对于tier2sort_key 中在迭代时 对于tier同样,最后会是dev[sort_key]最后的一个(也就是最大的一个 一般为 dev[sort_key]中dev[part_want]最大的,相应part_want同样的则会依据random.randint(0, 0xFFFF)排序。由于random.randint(0, 0xFFFF)是个随机值,也就是说会选择random.randint(0, 0xFFFF)产生的值最大的,对于partion选择设备时,对于part_want同样的,random.randint(0, 0xFFFF)大的会被优先被选上,这样也保证了在同样 part_want值的设备中,partion随机选择一个设备)
tier2sort_key[tier] = dev[\'sort_key\']
#求最长的tier (1, 1, \'127.0.0.2:6001\', 1) 最长
if len(tier) > max_tier_depth:
max_tier_depth = len(tier)
#没一个tier 相应的子tier 如 () 相应的子tier为(1)。(2)。(3)
tier2children_sets = build_tier_tree(available_devs)
tier2children = defaultdict(list)
tier2children_sort_key = {}
tiers_list = [()]
depth = 1
while depth <= max_tier_depth:
new_tiers_list = []
#開始 tier 为()
for tier in tiers_list:
child_tiers = list(tier2children_sets[tier])
#比方() 相应的child_tiers 为[(1,),(2,),(3)] 则会按每个孩子的sort_key 排序比如
#tier2sort_key[\'(1,)\'] = (1,2,3)
#tier2sort_key[\'(2,)\'] = (3,1,3)
#tier2sort_key[\'(3,)\'] = (2,1,3)
#排序后为 [(1,),(3,),(2)]
#(1,)中的sort_key 为其全部child_tiers中最大的一个,(2,),(3,)也是如此
child_tiers.sort(key=tier2sort_key.__getitem__)
#tier2children 每个key相应value中的值是按 sort_key 排序的
tier2children[tier] = child_tiers
tier2children_sort_key[tier] = map(
tier2sort_key.__getitem__, child_tiers)
new_tiers_list.extend(child_tiers)
#tiers_list 最后中的元素为(2, 3, \'127.0.0.1:6000\', 2),即每一设备的(region,zone,ip:port,id)
tiers_list = new_tiers_list
depth += 1
#(1, [0, 1, 2]) 2**power
for part, replace_replicas in reassign_parts:
# Gather up(收集起来) what other tiers (regions, zones, ip/ports, and
# devices) the replicas not-to-be-moved are in for this part.
other_replicas = defaultdict(int)
unique_tiers_by_tier_len = defaultdict(set)
#_replicas_for_part 每个part相应的 replica 为一个list
for replica in self._replicas_for_part(part):
#对于当前的part,假设该备份没有被收集
if replica not in replace_replicas:
dev = self.devs[self._replica2part2dev[replica][part]]
for tier in dev[\'tiers\']:
other_replicas[tier] += 1
unique_tiers_by_tier_len[len(tier)].add(tier)
for replica in replace_replicas:
tier = ()
depth = 1
#此循环为tiers 排序 依据 有多少当前分区的备份数
while depth <= max_tier_depth:
# Order the tiers by how many replicas of this
# partition they already have(对tiers进行排序是依据他们分配了多少分区).
# Then, of the ones
# with the smallest number of replicas, pick the
# tier with the hungriest drive and then continue
# searching in that subtree.(因此有最少备份的分区,将会选择最饥饿的设备)
#
# There are other strategies we could use here,
# such as hungriest-tier(最饥渴的) (i.e. biggest
# sum-of-parts-wanted) or picking one at random.
# However, hungriest-drive is what was used here
# before, and it worked pretty well in practice.
#
# Note that this allocator will balance things as
# evenly as possible at each level of the device
# layout(在设备的每一层尽可能的平衡).
# If your layout is extremely unbalanced,
# this may produce poor results.
#
# This used to be a cute(聪明), recursive(递归) function, but it\'s been
# unrolled(扩展) for performance.
# We sort the tiers here so that, when we look for a tier
# with the lowest number of replicas, the first one we
# find is the one with the hungriest drive (i.e. drive
# with the largest sort_key value). This lets us
# short-circuit(环) the search while still ensuring we get the
# right tier
#candidate 候选
candidates_with_replicas = \
unique_tiers_by_tier_len[len(tier) + 1]
# Find a tier with the minimal replica count and the
# hungriest drive among all the tiers with the minimal
# replica count(找到replica最少的tier 并在这些replica最少的tier中找到最饥饿的drive).
if len(tier2children[tier]) > \
len(candidates_with_replicas):
# There exists at least one tier with 0 other replicas
tier = max((t for t in tier2children[tier]
if other_replicas[t] == 0),
key=tier2sort_key.__getitem__)
else:
tier = max(tier2children[tier],
key=lambda t: (-other_replicas[t],
tier2sort_key[t]))
depth += 1
#上边几步会选择出来一个 tier, 有待认真理解
#tier2devs[tier]相应的值是按 sort_key升序排列的 所以此时的dev肯定是sort_key 最大的dev
dev = tier2devs[tier][-1]
dev[\'parts_wanted\'] -= 1
dev[\'parts\'] += 1
old_sort_key = dev[\'sort_key\']
#更新 part_wanted 和parts后会又一次生成一个new_sort_key
new_sort_key = dev[\'sort_key\'] = self._sort_key_for(dev)
#由于dev的part_wanted 和 parts 变了 所以要做一些又一次插入操作
for tier in dev[\'tiers\']:
other_replicas[tier] += 1
unique_tiers_by_tier_len[len(tier)].add(tier)
#返回 old_sort_key 在tier2dev_sort_key[tier]中插入的位置 不会插入 。主要是找到旧的sort_key 在 tier2dev_sort_key中的位置
index = bisect.bisect_left(tier2dev_sort_key[tier],
old_sort_key)
#下边两行是把tier2devs和tier2dev_sort_key 之前旧的sort_key 相应的dev删去
tier2devs[tier].pop(index)
tier2dev_sort_key[tier].pop(index)
#依据新的的sort_key 找到dev应该插入的新位置 并插入的tier2devs和tier2dev_sort_key新的位置上去
new_index = bisect.bisect_left(tier2dev_sort_key[tier],
new_sort_key)
tier2devs[tier].insert(new_index, dev)
tier2dev_sort_key[tier].insert(new_index, new_sort_key)
#更新tier2sort_key 是teir2sort_key 为相应tier中sort_key为最大的
new_last_sort_key = tier2dev_sort_key[tier][-1]
tier2sort_key[tier] = new_last_sort_key
# Now jiggle(轻摇 摇动) tier2children values to keep them sorted
#找到上一级tier 如(1,2)相应的上一级为(1,)
parent_tier = tier[0:-1]
#获得上一级tier在其上一级上一级的tier中相应的位置
index = bisect.bisect_left(
tier2children_sort_key[parent_tier],
old_sort_key)
#pop出来这个tier并得到这个tier
popped = tier2children[parent_tier].pop(index)
#从tier2children_sort_key中pop出旧的sort_key
tier2children_sort_key[parent_tier].pop(index)
#依据新的最大的sort_key找到当前tier在其上一级tier中应该插入的位置
new_index = bisect.bisect_left(
tier2children_sort_key[parent_tier],
new_last_sort_key)
#在tier2children新位置中插入上一步pop出来的tier
tier2children[parent_tier].insert(new_index, popped)
#更新本一级在上一级的tier2children中的插入位置
tier2children_sort_key[parent_tier].insert(
new_index, new_last_sort_key)
self._replica2part2dev[replica][part] = dev[\'id\']
这个函数就是_replica2part2dev进行赋值,在这个赋值过程中,须要考虑允许分区相应的副本不能在一个zone里面,并且在选择详细设备时首先要选择part_wants值最大的,过程比較复杂,须要好好理解。
在rebalance方法运行完成后。RingBuilder实例的各个属性就被附上了值,我们回到Command类的rebalance,在经过一些try....except后。会将数据再次序列化,此时序列化,须要将数据序列化到两个文件中,第一个是create和add方法序列化存入的object.builder文件。这里面存入的数据为创建Ring已经维护Ring的详细数据,还有一个文件为object.ring.gz这里面存入的数据为系统使用Ring时的数据。详细看代码片段:
builder.get_ring().save(
pathjoin(backup_dir, \'%d.\' % ts + basename(ring_file)))
#存入到相应的文件中面
builder.save(pathjoin(backup_dir, \'%d.\' % ts + basename(argv[1])))
builder.get_ring().save(ring_file)
当中builer调用get_ring方法获得一个RingData实例。让后将RingData实例序列化(将数据存入到object.ring.gz中),我们看看RingData实例__init__函数,看看它存到磁盘上的究竟为什么数据。
def __init__(self, replica2part2dev_id, devs, part_shift):
self.devs = devs
self._replica2part2dev_id = replica2part2dev_id
self._part_shift = part_shift
#dev 格式为dev0 = {\'region\': 1, \'zone\': 1, \'ip\': \'192.168.1.1\',
# \'port\': \'6000\', \'id\': 0}
for dev in self.devs:
if dev is not None:
dev.setdefault("region", 1)
从上面函数能够看出,其序列化的事实上就是replica2part2dev_id(备份到分区到设备的映射), devs(全部的设备), part_shift(右移的位数)。
至此rebalance解说完成。以下看外部函数怎样使用Ring中的数据。
2 Ring数据的使用
第一节讲到rebalance后。对外提供的数据会此存在object.ring.gz文件中,数据是怎样调用的那,我们再看swift/commom/ring/ring.py中的Ring类。此类提供数据调用的接口。详细看代码:
class Ring(object):
"""
Partitioned consistent hashing ring(分区一致性hash).
:param serialized_path: path to serialized RingData instance
:param reload_time: time interval(间隔) in seconds to check for a ring change
"""
def __init__(self, serialized_path, reload_time=15, ring_name=None):
# can\'t use the ring unless HASH_PATH_SUFFIX(后缀 下标) is set
validate_configuration()
if ring_name:
self.serialized_path = os.path.join(serialized_path,
ring_name + \'.ring.gz\')
else:
self.serialized_path = os.path.join(serialized_path)
self.reload_time = reload_time
self._reload(force=True)
def _reload(self, force=False):
self._rtime = time() + self.reload_time
if force or self.has_changed():
ring_data = RingData.load(self.serialized_path)
self._mtime = getmtime(self.serialized_path)
self._devs = ring_data.devs
# NOTE(akscram): Replication parameters like replication_ip
# and replication_port are required for
# replication process. An old replication
# ring doesn\'t contain this parameters into
# device. Old-style pickled rings won\'t have
# region information.
for dev in self._devs:
if dev:
dev.setdefault(\'region\', 1)
if \'ip\' in dev:
dev.setdefault(\'replication_ip\', dev[\'ip\'])
if \'port\' in dev:
dev.setdefault(\'replication_port\', dev[\'port\'])
self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift
self._rebuild_tier_data()
# Do this now, when we know the data has changed, rather then
# doing it on every call to get_more_nodes().
regions = set()
zones = set()
ip_ports = set()
self._num_devs = 0
for dev in self._devs:
if dev:
regions.add(dev[\'region\'])
zones.add((dev[\'region\'], dev[\'zone\']))
ip_ports.add((dev[\'region\'], dev[\'zone\'],
dev[\'ip\'], dev[\'port\']))
self._num_devs += 1
self._num_regions = len(regions)
self._num_zones = len(zones)
self._num_ip_ports = len(ip_ports)
def _rebuild_tier_data(self):
self.tier2devs = defaultdict(list)
for dev in self._devs:
if not dev:
continue
for tier in tiers_for_dev(dev):
self.tier2devs[tier].append(dev)
tiers_by_length = defaultdict(list)
for tier in self.tier2devs:
tiers_by_length[len(tier)].append(tier)
self.tiers_by_length = sorted(tiers_by_length.values(),
key=lambda x: len(x[0]))
for tiers in self.tiers_by_length:
tiers.sort()
@property
def replica_count(self):
"""Number of replicas (full or partial) used in the ring."""
return len(self._replica2part2dev_id)
@property
def partition_count(self):
"""Number of partitions in the ring."""
return len(self._replica2part2dev_id[0])
@property
def devs(self):
"""devices in the ring"""
#15秒 扫描一次
if time() > self._rtime:
self._reload()
return self._devs
def has_changed(self):
"""
Check to see if the ring on disk is different than the current one in
memory.
:returns: True if the ring on disk has changed, False otherwise
"""
return getmtime(self.serialized_path) != self._mtime
def _get_part_nodes(self, part):
part_nodes = []
seen_ids = set()
for r2p2d in self._replica2part2dev_id:
if part < len(r2p2d):
dev_id = r2p2d[part]
if dev_id not in seen_ids:
part_nodes.append(self.devs[dev_id])
seen_ids.add(dev_id)
return part_nodes
# common/db_replicator.py _replicate_object 用到
def get_part(self, account, container=None, obj=None):
"""
Get the partition for an account/container/object.
:param account: account name
:param container: container name
:param obj: object name
:returns: the partition number
"""
key = hash_path(account, container, obj, raw_digest=True)
if time() > self._rtime:
self._reload()
part = struct.unpack_from(\'>I\', key)[0] >> self._part_shift
return part
def get_part_nodes(self, part):
"""
Get the nodes that are responsible for the partition. If one
node is responsible for more than one replica of the same
partition, it will only appear in the output once.
:param part: partition to get nodes for
:returns: list of node dicts
See :func:`get_nodes` for a description of the node dicts.
"""
if time() > self._rtime:
self._reload()
return self._get_part_nodes(part)
def get_nodes(self, account, container=None, obj=None):
"""
Get the partition and nodes for an account/container/object.
If a node is responsible for more than one replica, it will
only appear in the output once.
:param account: account name
:param container: container name
:param obj: object name
:returns: a tuple of (partition, list of node dicts)
Each node dict will have at least the following keys:
====== ===============================================================
id unique integer identifier amongst devices
weight a float of the relative weight of this device as compared to
others; this indicates how many partitions the builder will try
to assign to this device
zone integer indicating which zone the device is in; a given
partition will not be assigned to multiple devices within the
same zone
ip the ip address of the device
port the tcp port of the device
device the device\'s name on disk (sdb1, for example)
meta general use \'extra\' field; for example: the online date, the
hardware description
====== ===============================================================
"""
part = self.get_part(account, container, obj)
return part, self._get_part_nodes(part)
#当一个节点失败时。replicator会利用此方法,来获一个替代它的节点
def get_more_nodes(self, part):
"""
Generator to get extra nodes for a partition for hinted handoff(由于为守护进程在出来 故为私下转移).
#handoff nodes 尝试处于primary primary_nodes 之外的 zones中
The handoff nodes will try to be in zones other than the
primary zones, will take into account the device weights, and
will usually keep the same sequences of handoffs even with
ring changes.要是handoffs 处于同样的序列中即使ring 改变了
:param part: partition to get handoff nodes for
:returns: generator of node dicts
See :func:`get_nodes` for a description of the node dicts.
"""
if time() > self._rtime:
self._reload()
primary_nodes = self._get_part_nodes(part)
used = set(d[\'id\'] for d in primary_nodes)
same_regions = set(d[\'region\'] for d in primary_nodes)
same_zones = set((d[\'region\'], d[\'zone\']) for d in primary_nodes)
same_ip_ports = set((d[\'region\'], d[\'zone\'], d[\'ip\'], d[\'port\'])
for d in primary_nodes)
#parts=2**power
parts = len(self._replica2part2dev_id[0])
# unpack_from(fmt, buffer, offset=0): # known case of _struct.unpack_from
#Unpack the buffer, containing packed C structure data, according to
#fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt).
#>I I代表整数, >I 表示大端存储
start = struct.unpack_from(
\'>I\', md5(str(part)).digest())[0] >> self._part_shift
#步长
inc = int(parts / 65536) or 1
# Multiple loops for execution speed; the checks and bookkeeping get
# simpler as you go along
#hit 命中
hit_all_regions = len(same_regions) == self._num_regions
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
if hit_all_regions:
# At this point, there are no regions left untouched, so we
# can stop looking.
break
for part2dev_id in self._replica2part2dev_id:
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
dev = self._devs[dev_id]
region = dev[\'region\']
if dev_id not in used and region not in same_regions:
yield dev
used.add(dev_id)
same_regions.add(region)
zone = dev[\'zone\']
ip_port = (region, zone, dev[\'ip\'], dev[\'port\'])
same_zones.add((region, zone))
same_ip_ports.add(ip_port)
if len(same_regions) == self._num_regions:
hit_all_regions = True
break
hit_all_zones = len(same_zones) == self._num_zones
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
if hit_all_zones:
# Much like we stopped looking for fresh regions before, we
# can now stop looking for fresh zones; there are no more.
break
for part2dev_id in self._replica2part2dev_id:
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
dev = self._devs[dev_id]
zone = (dev[\'region\'], dev[\'zone\'])
if dev_id not in used and zone not in same_zones:
yield dev
used.add(dev_id)
same_zones.add(zone)
ip_port = zone + (dev[\'ip\'], dev[\'port\'])
same_ip_ports.add(ip_port)
if len(same_zones) == self._num_zones:
hit_all_zones = True
break
hit_all_ip_ports = len(same_ip_ports) == self._num_ip_ports
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
if hit_all_ip_ports:
# We\'ve exhausted(耗尽) the pool of unused backends, so stop
# looking.
break
for part2dev_id in self._replica2part2dev_id:
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
dev = self._devs[dev_id]
ip_port = (dev[\'region\'], dev[\'zone\'],
dev[\'ip\'], dev[\'port\'])
if dev_id not in used and ip_port not in same_ip_ports:
yield dev
used.add(dev_id)
same_ip_ports.add(ip_port)
if len(same_ip_ports) == self._num_ip_ports:
hit_all_ip_ports = True
break
hit_all_devs = len(used) == self._num_devs
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
if hit_all_devs:
# We\'ve used every device we have, so let\'s stop looking for
# unused devices now.
break
for part2dev_id in self._replica2part2dev_id:
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
if dev_id not in used:
yield self._devs[dev_id]
used.add(dev_id)
if len(used) == self._num_devs:
hit_all_devs = True
break
当中:
get_part(self, account, container=None, obj=None)
def get_part_nodes(self, part):
def get_nodes(self, account, container=None, obj=None):
def get_more_nodes(self, part):
通过反序列化哈.ring.gz中的数据。然后利用上面的几个函数接口利用反序列化回来的数据。以下我们在回过头看看OpenStack_Swift源代码分析——Ring基本原理及一致性Hash算法中的最后一张图
图1 环的运行机制
利用上图指示的关系。假如我们要存入一个对象,首先通过对象的account/container/object的md5值的前四个字节(32位)右移动part_shift位,得到详细对象相应的分区号,然后通过replica2part2dev_id(备份到分区到设备的映射), 找到当前分区相应的三个备份设备的Id。获得id后从devs里面得到详细的dev,因dev里面存有设备的ip,port,以及存储数据的磁盘,分别创建请求这三个设备,将数据存入设备上。
至此本节解说完成,在兴许章节中我将会讲述加入删除设备后,怎样维护环中的映射关系。
由于本人水平有限。文中难免出现理解错误。敬请指正、交流,谢谢!
请发表评论