构建一致性哈希环

作者:Greg Holt,2011年2月

这是我对如何构建一致性哈希环的五篇早期文章的汇编。这些文章似乎被访问的频率很高,所以我把它们都放在了一页上,以便更轻松地阅读。

注意

这是一个历史文档;因此,所有代码示例都是 Python 2。如果你对此感到不安,可以把它看作伪代码。无论实现语言如何,一致性哈希和分布式系统更广泛领域的最新技术都取得了进步。我们希望这个从第一原理出发的介绍仍然具有参考价值,尤其是在数据如何在 Swift 集群中分发方面。

第一部分

“一致性哈希”是一个术语,用于描述使用哈希算法确定其位置来分发数据的一种过程。仅使用数据 ID 的哈希值,就可以确定该数据应该位于何处。哈希到位置的这种映射通常被称为“环”。

最简单的哈希可能是 ID 的模运算。例如,如果所有 ID 都是数字,并且你希望将数据分发到两台机器上,可以将所有奇数 ID 放在一台机器上,将所有偶数 ID 放在另一台机器上。假设你拥有平衡数量的奇数和偶数 ID,并且每个 ID 的数据大小平衡,你的数据将在两台机器之间平衡。

由于数据 ID 通常是文本名称而不是数字,例如文件路径或 URL,因此使用“真实”哈希算法将名称转换为数字是有意义的。例如,使用 MD5,名称 ‘mom.png’ 的哈希值为 ‘4559a12e3e8da7c2186250c2f292e3af’,名称 ‘dad.png’ 的哈希值为 ‘096edcc4107e9e18d6a03a43b3853bea’。现在,使用模运算,我们可以将 ‘mom.jpg’ 放在奇数机器上,将 ‘dad.png’ 放在偶数机器上。使用像 MD5 这样的哈希算法的另一个好处是,生成的哈希值具有已知的均匀分布,这意味着你的 ID 将均匀分布,而无需担心保持 ID 值本身均匀分布。

这是一个简单的示例,展示了它的实际效果

from hashlib import md5
from struct import unpack_from

NODE_COUNT = 100
DATA_ID_COUNT = 10000000

node_counts = [0] * NODE_COUNT
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    # This just pulls part of the hash out as an integer
    hsh = unpack_from('>I', md5(data_id).digest())[0]
    node_id = hsh % NODE_COUNT
    node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)
100000: Desired data ids per node
100695: Most data ids on one node, 0.69% over
99073: Least data ids on one node, 0.93% under

这还不错;每个节点的分布略低于百分之一。在本系列下一部分中,我们将研究模运算分布导致的问题以及如何改进我们的环以克服这些问题。

第二部分

在本系列第一部分中,我们对使用哈希的模运算来定位数据进行了简单的测试。我们看到了非常好的分布,但这只是故事的一部分。分布式系统不仅需要分发负载,还需要随着越来越多的数据放入其中而扩展。

所以,假设我们有一个由 100 个节点组成的系统正在运行,使用我们之前的算法,但它开始变得拥满,所以我们想添加另一个节点。当我们向算法中添加第 101 个节点时,我们注意到许多 ID 现在映射到与之前不同的节点。我们需要在我们的系统中四处移动大量数据才能将其全部放置到位。

让我们以更小的规模来研究发生了什么:只有 2 个节点,节点 0 获取偶数 ID,节点 1 获取奇数 ID。所以数据 ID 100 将映射到节点 0,数据 ID 101 到节点 1,数据 ID 102 到节点 0,等等。这只是 node = id % 2。现在我们添加第三个节点(节点 2)以获得更多空间,所以我们希望 node = id % 3。所以现在数据 ID 100 映射到节点 ID 1,数据 ID 101 到节点 2,数据 ID 102 到节点 0。所以我们需要移动 2 个 ID 中的数据,以便再次找到它们。

让我们以更大的规模来研究一下

from hashlib import md5
from struct import unpack_from

NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000

moved_ids = 0
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
    node_id = hsh % NODE_COUNT
    new_node_id = hsh % NEW_NODE_COUNT
    if node_id != new_node_id:
        moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
9900989 ids moved, 99.01%

哇,这很严重。仅仅为了增加 1% 的容量,我们就必须移动 99% 的数据!我们需要一种新的算法来对抗这种行为。

这就是“环”真正发挥作用的地方。我们可以直接将哈希范围分配给节点,然后使用一种算法来最大限度地减少这些范围的变化。回到我们的较小规模,假设我们的 ID 范围为 0 到 999。我们有两个节点,并将数据 ID 0–499 分配给节点 0,将 500–999 分配给节点 1。稍后,当我们添加节点 2 时,我们可以从节点 0 和节点 1 中各取一半的数据 ID,从而最大限度地减少需要移动的数据量。

让我们以更大的规模来研究一下

from bisect import bisect_left
from hashlib import md5
from struct import unpack_from

NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000

node_range_starts = []
for node_id in range(NODE_COUNT):
    node_range_starts.append(DATA_ID_COUNT /
                             NODE_COUNT * node_id)
new_node_range_starts = []
for new_node_id in range(NEW_NODE_COUNT):
    new_node_range_starts.append(DATA_ID_COUNT /
                              NEW_NODE_COUNT * new_node_id)
moved_ids = 0
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
    node_id = bisect_left(node_range_starts,
                          hsh % DATA_ID_COUNT) % NODE_COUNT
    new_node_id = bisect_left(new_node_range_starts,
                          hsh % DATA_ID_COUNT) % NEW_NODE_COUNT
    if node_id != new_node_id:
        moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
4901707 ids moved, 49.02%

好的,这更好。但仍然,为了增加 1% 的容量而移动 50% 的数据并不是很好。如果我们更仔细地检查发生了什么,我们会看到一种“手风琴效应”。我们缩小了节点 0 的范围,以便给予新节点,但这会将其他节点的范围都移动了相同的量。

我们可以通过为每个节点创建“虚拟节点”来最大限度地减少节点分配范围的变化。例如,100 个节点可能有 1000 个虚拟节点。让我们研究一下这可能如何工作。

from bisect import bisect_left
from hashlib import md5
from struct import unpack_from

NODE_COUNT = 100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000

vnode_range_starts = []
vnode2node = []
for vnode_id in range(VNODE_COUNT):
    vnode_range_starts.append(DATA_ID_COUNT /
                              VNODE_COUNT * vnode_id)
    vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
NEW_NODE_COUNT = NODE_COUNT + 1
vnodes_to_reassign = VNODE_COUNT / NEW_NODE_COUNT
while vnodes_to_reassign > 0:
    for node_to_take_from in range(NODE_COUNT):
        for vnode_id, node_id in enumerate(new_vnode2node):
            if node_id == node_to_take_from:
                new_vnode2node[vnode_id] = new_node_id
                vnodes_to_reassign -= 1
                break
        if vnodes_to_reassign <= 0:
            break
moved_ids = 0
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
    vnode_id = bisect_left(vnode_range_starts,
                         hsh % DATA_ID_COUNT) % VNODE_COUNT
    node_id = vnode2node[vnode_id]
    new_node_id = new_vnode2node[vnode_id]
    if node_id != new_node_id:
        moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
90423 ids moved, 0.90%

就这样,我们增加了 1% 的容量,并且只移动了 0.9% 的现有数据。vnode_range_starts 列表看起来有点不合适。它的值被计算出来并且在集群的整个生命周期中都不会改变,所以让我们优化它。

from bisect import bisect_left
from hashlib import md5
from struct import unpack_from

NODE_COUNT = 100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000

vnode2node = []
for vnode_id in range(VNODE_COUNT):
    vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
vnodes_to_reassign = VNODE_COUNT / (NODE_COUNT + 1)
while vnodes_to_reassign > 0:
    for node_to_take_from in range(NODE_COUNT):
        for vnode_id, node_id in enumerate(vnode2node):
            if node_id == node_to_take_from:
                vnode2node[vnode_id] = new_node_id
                vnodes_to_reassign -= 1
                break
        if vnodes_to_reassign <= 0:
            break
moved_ids = 0
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
    vnode_id = hsh % VNODE_COUNT
    node_id = vnode2node[vnode_id]
    new_node_id = new_vnode2node[vnode_id]
    if node_id != new_node_id:
        moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
89841 ids moved, 0.90%

就这样。在本系列下一部分中,我们将进一步研究该算法的局限性以及如何改进它。

第三部分

在本系列第二部分中,我们获得了一种算法,即使在向集群添加新节点时也能表现良好。我们使用了 1000 个虚拟节点,这些节点可以独立分配给节点,从而使我们能够在添加节点时最大限度地减少需要移动的数据量。

虚拟节点的数量对你可以拥有的真实节点的数量设置了上限。例如,如果你有 1000 个虚拟节点,并且尝试添加第 1001 个真实节点,你无法为其分配虚拟节点,而又不让另一个真实节点没有分配,从而使你仍然只有 1000 个活动的真实节点。

不幸的是,在集群的整个生命周期中,最初创建的虚拟节点数量无法更改,而无需进行大量的细致工作。例如,你可以通过将每个现有的虚拟节点拆分为两半并将两半都分配给相同的真实节点来将虚拟节点计数加倍。但是,如果真实节点使用虚拟节点的 ID 来优化存储数据(例如,所有数据都存储在 /[virtual node id]/[data id]),它将不得不移动数据以反映更改。并且在移动发生时,它将不得不使用新的和旧的位置来解析数据,这使得原子操作变得困难或不可能。

让我们继续假设更改虚拟节点计数的工作量大于其价值,但请记住,某些应用程序对此可能没问题。

处理此限制的最简单方法是使限制足够高,以至于它无关紧要。例如,如果我们确定我们的集群永远不会超过 60,000 个真实节点,我们可以只创建 60,000 个虚拟节点。

此外,我们还应该在计算中包含节点的相对大小。例如,一年后,我们可能会拥有能够处理当前节点两倍容量的真实节点。因此,我们希望将两倍的虚拟节点分配给这些未来的节点,所以我们也许应该将我们的虚拟节点估计值提高到 120,000。

一个好的规则可能是为每个最大容量的真实节点计算 100 个虚拟节点。这将允许你改变任何给定节点上的负载 1%,即使在最大容量下,这也是相当精细的调整。所以现在我们有 6,000,000 个虚拟节点用于最大容量的 60,000 个真实节点集群。

600 万个虚拟节点看起来很多,而且可能看起来我们会消耗太多的内存。但唯一受此影响的结构是虚拟节点到真实节点的映射。所需的基准内存量将是 600 万次 2 字节(用于存储从 0 到 65,535 的真实节点 ID)。12 兆字节的内存现在并不多。

即使考虑到灵活的数据类型的开销,情况也不太糟糕。我更改了本系列上一部分的中的代码,使其具有 60,000 个真实节点和 6,000,000 个虚拟节点,将列表更改为数组(‘H’),并且 python 的驻留内存达到了 27m——而且这包括两个环。

为了改变术语,我们将开始将这些虚拟节点称为“分区”。这将使区分我们迄今为止谈论的两种类型的节点更容易。此外,将分区视为它们只是哈希空间中不变的部分是有意义的。

我们还将始终使分区计数为 2 的幂。这使得仅使用位操作对哈希进行分区变得容易,而不是模运算。它速度不会快很多,但稍微快一点。所以,这是我们更新的环代码,使用 8,388,608 (2 ** 23) 个分区和 65,536 个节点。我们增加了示例数据 ID 集并检查了分布,以确保我们没有破坏任何东西。

from array import array
from hashlib import md5
from struct import unpack_from

PARTITION_POWER = 23
PARTITION_SHIFT = 32 - PARTITION_POWER
NODE_COUNT = 65536
DATA_ID_COUNT = 100000000

part2node = array('H')
for part in range(2 ** PARTITION_POWER):
    part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    part = unpack_from('>I',
        md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
    node_id = part2node[part]
    node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)
1525: Desired data ids per node
1683: Most data ids on one node, 10.36% over
1360: Least data ids on one node, 10.82% under

嗯。+–10% 似乎有点高,但我用 65,536 个分区和 256 个节点重新运行,得到了 +–0.4%,所以只是我们的样本大小(1 亿)对于我们的分区数(800 万)来说太小了。运行更大的样本大小的实验将花费太长时间,所以让我们减少回这些较小的数字。(为了确定,我用 100 亿个数据 ID 样本集在完整版本上重新运行,得到了 +–1%,但运行时间为 6.5 小时。)

在本系列下一部分中,我们将讨论如何提高集群中数据的耐久性。

第四部分

在本系列第三部分中,我们进一步讨论了分区(虚拟节点)并根据此清理了我们的代码。现在,让我们讨论如何在集群中提高数据的耐久性和可用性。

对于许多分布式数据存储,耐久性非常重要。需要 RAID 阵列或单独不同的数据副本。虽然 RAID 会提高耐久性,但它不会提高可用性——如果 RAID 机器崩溃,数据可能安全但无法访问,直到完成维修。如果我们保留不同机器上的不同副本,并且一台机器崩溃,其他副本仍然可用,而我们可以修复损坏的机器。

获得这种多副本耐久性/可用性的一个简单方法是使用多个环和节点组。例如,为了实现行业标准的三个副本,你可以将节点分成三个组,并且每个组都有自己的环,并且每个环都会收到每个数据项的副本。这可以很好地工作,但缺点是扩展容量需要一次添加三个节点,并且失去一个节点基本上会降低容量是该节点容量的三倍。

相反,让我们使用一种不同的但常用的方法来满足我们的要求,使用单个环。这可以通过从起始点沿着环行走并查找其他不同的节点来完成。以下代码支持可变的副本数(设置为 3 以进行测试)

from array import array
from hashlib import md5
from struct import unpack_from

REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
DATA_ID_COUNT = 10000000

part2node = array('H')
for part in range(2 ** PARTITION_POWER):
    part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    part = unpack_from('>I',
        md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
    node_ids = [part2node[part]]
    node_counts[node_ids[0]] += 1
    for replica in range(1, REPLICAS):
        while part2node[part] in node_ids:
            part += 1
            if part > PARTITION_MAX:
                part = 0
        node_ids.append(part2node[part])
        node_counts[node_ids[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)
117186: Desired data ids per node
118133: Most data ids on one node, 0.81% over
116093: Least data ids on one node, 0.93% under

这很好;略低于 1% 的偏差。虽然这有效,但存在一些问题。

首先,由于我们最初将分区分配给节点的方式,给定节点的所有分区都有其额外的副本位于相同的另外两个节点上。这里的问题是,当一台机器发生故障时,这些其他节点的负载会增加那么多。最好在最初的分配中对分区进行混洗,以更好地分发故障转移负载。

另一个问题有点难解释,但与机器的物理分离有关。想象一下,你只能在数据中心的一个机架中放置 16 台机器。我们一直在使用的 256 个节点将填充 16 个机架。使用我们当前的编码,如果一个机架发生故障(电源问题、网络问题等),很可能会有一些数据在其所有三个副本都在该机架中,从而无法访问。我们可以通过向节点添加区域的概念,然后确保副本存储在不同的区域中来修复这个缺点。

from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from

REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000

node2zone = []
while len(node2zone) < NODE_COUNT:
    zone = 0
    while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
        node2zone.append(zone)
        zone += 1
part2node = array('H')
for part in range(2 ** PARTITION_POWER):
    part2node.append(part % NODE_COUNT)
shuffle(part2node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    part = unpack_from('>I',
        md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
    node_ids = [part2node[part]]
    zones = [node2zone[node_ids[0]]]
    node_counts[node_ids[0]] += 1
    zone_counts[zones[0]] += 1
    for replica in range(1, REPLICAS):
        while part2node[part] in node_ids and \
                node2zone[part2node[part]] in zones:
            part += 1
            if part > PARTITION_MAX:
                part = 0
        node_ids.append(part2node[part])
        zones.append(node2zone[node_ids[-1]])
        node_counts[node_ids[-1]] += 1
        zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
    (max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
    (min_count, under)
117186: Desired data ids per node
118782: Most data ids on one node, 1.36% over
115632: Least data ids on one node, 1.33% under
1875000: Desired data ids per zone
1878533: Most data ids in one zone, 0.19% over
1869070: Least data ids in one zone, 0.32% under

因此,混洗和区域区分会影响我们的分布,但仍然足够好。此测试在我的机器上花费了大约 64 秒。

有一种完全不同的,而且非常常见的方法可以实现相同的要求。这种替代方法根本不使用分区,而是将锚点分配给哈希空间中的节点。对于给定的哈希,找到第一个节点只需沿着这个锚点环找到下一个节点,而找到其他节点则以类似的方式进行。为了获得我们虚拟节点的等效效果,每个真实节点都会被分配多个锚点。

from bisect import bisect_left
from hashlib import md5
from struct import unpack_from

REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
VNODE_COUNT = 100

node2zone = []
while len(node2zone) < NODE_COUNT:
    zone = 0
    while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
        node2zone.append(zone)
        zone += 1
hash2index = []
index2node = []
for node in range(NODE_COUNT):
    for vnode in range(VNODE_COUNT):
        hsh = unpack_from('>I', md5(str(node)).digest())[0]
        index = bisect_left(hash2index, hsh)
        if index > len(hash2index):
            index = 0
        hash2index.insert(index, hsh)
        index2node.insert(index, node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in range(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
    index = bisect_left(hash2index, hsh)
    if index >= len(hash2index):
        index = 0
    node_ids = [index2node[index]]
    zones = [node2zone[node_ids[0]]]
    node_counts[node_ids[0]] += 1
    zone_counts[zones[0]] += 1
    for replica in range(1, REPLICAS):
        while index2node[index] in node_ids and \
                node2zone[index2node[index]] in zones:
            index += 1
            if index >= len(hash2index):
                index = 0
        node_ids.append(index2node[index])
        zones.append(node2zone[node_ids[-1]])
        node_counts[node_ids[-1]] += 1
        zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
    (max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
    (min_count, under)
117186: Desired data ids per node
351282: Most data ids on one node, 199.76% over
15965: Least data ids on one node, 86.38% under
1875000: Desired data ids per zone
2248496: Most data ids in one zone, 19.92% over
1378013: Least data ids in one zone, 26.51% under

此测试花费了超过 15 分钟!不幸的是,这种方法也提供了更少的分布控制。为了获得更好的分布,你必须添加更多的虚拟节点,这会消耗更多的内存并花费更多的时间来构建环和执行不同的节点查找。最常见的操作,数据 ID 查找,可以改进(例如,预先确定每个虚拟节点的故障转移节点),但它从我们的第一个方法开始就落后太多,所以我们将坚持使用该方法。

在本系列下一部分中,我们将开始将所有这些内容打包到一个有用的 Python 模块中。

第五部分

在本系列第四部分中,我们最终得到了一个多副本、明确区域化的环。或者至少是它的开始。在本最终部分中,我们将把代码打包成一个可用的 Python 模块,然后添加一个最后的特性。首先,让我们将环本身与构建环的数据及其测试分开。

from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time

class Ring(object):

    def __init__(self, nodes, part2node, replicas):
        self.nodes = nodes
        self.part2node = part2node
        self.replicas = replicas
        partition_power = 1
        while 2 ** partition_power < len(part2node):
            partition_power += 1
        if len(part2node) != 2 ** partition_power:
            raise Exception("part2node's length is not an "
                            "exact power of 2")
        self.partition_shift = 32 - partition_power

    def get_nodes(self, data_id):
        data_id = str(data_id)
        part = unpack_from('>I',
           md5(data_id).digest())[0] >> self.partition_shift
        node_ids = [self.part2node[part]]
        zones = [self.nodes[node_ids[0]]]
        for replica in range(1, self.replicas):
            while self.part2node[part] in node_ids and \
                   self.nodes[self.part2node[part]] in zones:
                part += 1
                if part >= len(self.part2node):
                    part = 0
            node_ids.append(self.part2node[part])
            zones.append(self.nodes[node_ids[-1]])
        return [self.nodes[n] for n in node_ids]

def build_ring(nodes, partition_power, replicas):
    begin = time()
    part2node = array('H')
    for part in range(2 ** partition_power):
        part2node.append(part % len(nodes))
    shuffle(part2node)
    ring = Ring(nodes, part2node, replicas)
    print '%.02fs to build ring' % (time() - begin)
    return ring

def test_ring(ring):
    begin = time()
    DATA_ID_COUNT = 10000000
    node_counts = {}
    zone_counts = {}
    for data_id in range(DATA_ID_COUNT):
        for node in ring.get_nodes(data_id):
            node_counts[node['id']] = \
                node_counts.get(node['id'], 0) + 1
            zone_counts[node['zone']] = \
                zone_counts.get(node['zone'], 0) + 1
    print '%ds to test ring' % (time() - begin)
    desired_count = \
        DATA_ID_COUNT / len(ring.nodes) * REPLICAS
    print '%d: Desired data ids per node' % desired_count
    max_count = max(node_counts.values())
    over = \
        100.0 * (max_count - desired_count) / desired_count
    print '%d: Most data ids on one node, %.02f%% over' % \
        (max_count, over)
    min_count = min(node_counts.values())
    under = \
        100.0 * (desired_count - min_count) / desired_count
    print '%d: Least data ids on one node, %.02f%% under' % \
        (min_count, under)
    zone_count = \
        len(set(n['zone'] for n in ring.nodes.values()))
    desired_count = \
        DATA_ID_COUNT / zone_count * ring.replicas
    print '%d: Desired data ids per zone' % desired_count
    max_count = max(zone_counts.values())
    over = \
        100.0 * (max_count - desired_count) / desired_count
    print '%d: Most data ids in one zone, %.02f%% over' % \
        (max_count, over)
    min_count = min(zone_counts.values())
    under = \
        100.0 * (desired_count - min_count) / desired_count
    print '%d: Least data ids in one zone, %.02f%% under' % \
        (min_count, under)

if __name__ == '__main__':
    PARTITION_POWER = 16
    REPLICAS = 3
    NODE_COUNT = 256
    ZONE_COUNT = 16
    nodes = {}
    while len(nodes) < NODE_COUNT:
        zone = 0
        while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
            node_id = len(nodes)
            nodes[node_id] = {'id': node_id, 'zone': zone}
            zone += 1
    ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
    test_ring(ring)
0.06s to build ring
82s to test ring
117186: Desired data ids per node
118773: Most data ids on one node, 1.35% over
115801: Least data ids on one node, 1.18% under
1875000: Desired data ids per zone
1878339: Most data ids in one zone, 0.18% over
1869914: Least data ids in one zone, 0.27% under

测试我们的环需要更长的时间,但这主要是因为将各种项目从数组切换到字典。拥有节点字典很好,因为你可以直接在那里附加任何节点信息(IP 地址、TCP 端口、驱动器路径等)。但我们仍然在进行进一步的测试;我们的分布仍然很好。

现在,让我们为我们的环添加最后一个特性:权重的概念。权重很有用,因为你在环的生命周期中后期添加的节点,很可能比一开始的节点拥有更大的容量。为了这个测试,我们将使一半的节点拥有两倍的权重。我们需要修改 `build_ring` 函数,以便为权重更高的节点分配更多的分区,并修改 `test_ring` 函数,以考虑到这些权重。由于我们已经修改了很多,我将再次发布整个模块

from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time

class Ring(object):

    def __init__(self, nodes, part2node, replicas):
        self.nodes = nodes
        self.part2node = part2node
        self.replicas = replicas
        partition_power = 1
        while 2 ** partition_power < len(part2node):
            partition_power += 1
        if len(part2node) != 2 ** partition_power:
            raise Exception("part2node's length is not an "
                            "exact power of 2")
        self.partition_shift = 32 - partition_power

    def get_nodes(self, data_id):
        data_id = str(data_id)
        part = unpack_from('>I',
           md5(data_id).digest())[0] >> self.partition_shift
        node_ids = [self.part2node[part]]
        zones = [self.nodes[node_ids[0]]]
        for replica in range(1, self.replicas):
            while self.part2node[part] in node_ids and \
                   self.nodes[self.part2node[part]] in zones:
                part += 1
                if part >= len(self.part2node):
                    part = 0
            node_ids.append(self.part2node[part])
            zones.append(self.nodes[node_ids[-1]])
        return [self.nodes[n] for n in node_ids]

def build_ring(nodes, partition_power, replicas):
    begin = time()
    parts = 2 ** partition_power
    total_weight = \
        float(sum(n['weight'] for n in nodes.values()))
    for node in nodes.values():
        node['desired_parts'] = \
            parts / total_weight * node['weight']
    part2node = array('H')
    for part in range(2 ** partition_power):
        for node in nodes.values():
            if node['desired_parts'] >= 1:
                node['desired_parts'] -= 1
                part2node.append(node['id'])
                break
        else:
            for node in nodes.values():
                if node['desired_parts'] >= 0:
                    node['desired_parts'] -= 1
                    part2node.append(node['id'])
                    break
    shuffle(part2node)
    ring = Ring(nodes, part2node, replicas)
    print '%.02fs to build ring' % (time() - begin)
    return ring

def test_ring(ring):
    begin = time()
    DATA_ID_COUNT = 10000000
    node_counts = {}
    zone_counts = {}
    for data_id in range(DATA_ID_COUNT):
        for node in ring.get_nodes(data_id):
            node_counts[node['id']] = \
                node_counts.get(node['id'], 0) + 1
            zone_counts[node['zone']] = \
                zone_counts.get(node['zone'], 0) + 1
    print '%ds to test ring' % (time() - begin)
    total_weight = float(sum(n['weight'] for n in
                             ring.nodes.values()))
    max_over = 0
    max_under = 0
    for node in ring.nodes.values():
        desired = DATA_ID_COUNT * REPLICAS * \
            node['weight'] / total_weight
        diff = node_counts[node['id']] - desired
        if diff > 0:
            over = 100.0 * diff / desired
            if over > max_over:
                max_over = over
        else:
            under = 100.0 * (-diff) / desired
            if under > max_under:
                max_under = under
    print '%.02f%% max node over' % max_over
    print '%.02f%% max node under' % max_under
    max_over = 0
    max_under = 0
    for zone in set(n['zone'] for n in
                    ring.nodes.values()):
        zone_weight = sum(n['weight'] for n in
            ring.nodes.values() if n['zone'] == zone)
        desired = DATA_ID_COUNT * REPLICAS * \
            zone_weight / total_weight
        diff = zone_counts[zone] - desired
        if diff > 0:
            over = 100.0 * diff / desired
            if over > max_over:
                max_over = over
        else:
            under = 100.0 * (-diff) / desired
            if under > max_under:
                max_under = under
    print '%.02f%% max zone over' % max_over
    print '%.02f%% max zone under' % max_under

if __name__ == '__main__':
    PARTITION_POWER = 16
    REPLICAS = 3
    NODE_COUNT = 256
    ZONE_COUNT = 16
    nodes = {}
    while len(nodes) < NODE_COUNT:
        zone = 0
        while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
            node_id = len(nodes)
            nodes[node_id] = {'id': node_id, 'zone': zone,
                              'weight': 1.0 + (node_id % 2)}
            zone += 1
    ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
    test_ring(ring)
0.88s to build ring
86s to test ring
1.66% max over
1.46% max under
0.28% max zone over
0.23% max zone under

所以即使我们有不同权重的节点,一切仍然正常。我用这段代码运行了另一个测试,使用了从 1 到 100 的随机权重,得到的节点超/欠分配值为 7.35%/18.12%,区域超/欠分配值为 0.24%/0.22%,考虑到如此疯狂的权重范围,仍然相当不错。

摘要

希望这个系列能很好地介绍如何构建一个环。这段代码基本上就是 OpenStack Swift 环的工作方式,除了 Swift 的环拥有许多额外的优化,例如单独存储每个副本分配,以及许多额外的特性,用于构建、验证和以其他方式使用环。