rpc框架-负载均衡之一致性hash算法(仿Dubbo)

一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提供出的,算法提出之初是用于大规模缓存系统的负载均衡。本文仿Dubbo

参考:http://tianxiaobo.com/2018/11/29/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-%E9%9B%86%E7%BE%A4%E5%AE%B9%E9%94%99%E4%B9%8B-LoadBalance/#23-consistenthashloadbalance

简介:

一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提供出的,算法提出之初是用于大规模缓存系统的负载均衡。

它的工作过程是这样的,首先根据 ip 获取其他的信息为缓存节点生成一个 hash值,并将这个 hash 投射到 [0, 2^31^ -1] 的圆环上(那么这个hash值生成函数,当然要保证生成值在此区间中)。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash值的缓存节点即可。大致效果如下,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash值,则到该缓存节点中存储或读取缓存项。

如果我们的圆环上只有少数的缓存节点,且分布不均,那么就会出现大问题:

缓存节点1会得到大量的写入和查询请求。

所以,引入虚拟节点概念,目的是通过引入虚拟节点,让缓存节点在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量的请求。

hashCode()方法和identityHashCode()方法的关系:

https://www.cnblogs.com/godtrue/p/6341473.html

一个对象的hashCode和identityHashCode 的关系:

  1. 对象的hashCode,一般是通过将该对象的内部地址转换成一个整数来实现的。
  2. 当一个类没有重写Object类的hashCode()方法时,它的hashCode和identityHashCode是一致的
  3. 当一个类重写了Object类的hashCode()方法时,它的hashCode则有重写的实现逻辑决定,此时的hashCode值一般就不再和对象本身的内部地址有相应的哈希关系了
  4. 当null调用hashCode方法时,会抛出空指针异常,但是调用System.identityHashCode(null)方法时能正常的返回0这个值
  5. 一个对象的identityHashCode能够始终和该对象的内部地址有一个相对应的关系,从这个角度来讲,它可以用于代表对象的引用地址,所以,在理解==这个操作运算符的时候是比较有用的

hash方法:

1
2
3
private long hash(byte[] digest, int number) {
return ((long)(digest[3 + number * 4] & 255) << 24 | (long)(digest[2 + number * 4] & 255) << 16 | (long)(digest[1 + number * 4] & 255) << 8 | (long)(digest[number * 4] & 255)) & 4294967295L;
}

分析:

首先看doSelect方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
// 获取serviceAddresses 的hashCode
int identityHashCode = System.identityHashCode(serviceAddresses);

String rpcServiceName = rpcRequest.getRpcServiceName();

ConsistentHashSelector selector = selectors.get(rpcServiceName);

/**
* 如果 serviceAddresses 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。
* 此时 selector.identityHashCode != identityHashCode 条件成立
* identityHashCode是之前List<String> serviceAddresses对象的identityHashCode
* selector.identityHashCode是 ConsistentHashSelector类的identityHashCode
*/
if (selector == null || selector.identityHashCode != identityHashCode) {
// 创建新的ConsistentHashSelector
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}

// 调用ConsistentHashSelector 的 select 方法选择serviceAddresses
return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
}

这里的selectors:

1
2
// key:String类型的rpcServiceName,value:内部类ConsistentHashSelector类型的
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();

内部类ConsistentHashSelector:

这个内部类的作用就是保存一个TreeMap<Long, String> virtualInvokers虚拟节点的集合(注意,这只是一个虚拟节点),以及一个该集合的 identityHashCode.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
static class ConsistentHashSelector {
// 使用 TreeMap 存储 serviceAddresses 虚拟节点
private final TreeMap<Long, String> virtualInvokers;

// 就是List<String> serviceAddresses对象的identityHashCode
private final int identityHashCode;

// 内部类的构造方法
ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;
// invokers是String类型的serviceAddresses,一个serviceAddresses在virtualInvokers这个map中就有160个节点
for (String invoker : invokers) {
// 按照前面的传参replicaNumber=160,那么就是进行了40次运算
for (int i = 0; i < replicaNumber / 4; i++) {
// 对invoker+i进行md5运算,得到一个长度为16个字节数组
byte[] digest = md5(invoker + i);
// 对digest部分字节进行4次hash运算,得到四个不同的long型正整数
for (int h = 0; h < 4; h++) {
// h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算
// h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算
// h = 2 时,取 digest 中下标为 8 ~ 11 的4个字节进行位运算
// h = 3 时,取 digest 中下标为 12 ~ 15 的4个字节进行位运算
long m = hash(digest, h);
// 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中,
// virtualInvokers 中的元素要有序,因此选用 TreeMap 作为存储结构
// 得到了160个虚拟节点,存储到了virtualInvokers这个TreeMap中
virtualInvokers.put(m, invoker);//virtualInvokers这个TreeMap中应该有serviceAddresses.size * 160个节点
}
}
}
}


public String select(String rpcServiceKey) {
// 对参数 key 进行 md5 运算
byte[] digest = md5(rpcServiceKey);
// 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
// 寻找合适的 serviceAddresses
long hashCode1 = hash(digest, 0);
return selectForKey(hashCode1);
}

public String selectForKey(long hashCode) {
/**
* tailMap()方法与firstEntry()方法表示:
* 到virtualInvokers这个TreeMap中查找第一个节点值大于或等于当前hashCode的节点entry
* virtualInvokers中第一个大于或等于hashCode的节点entry,对应的数据结构是<Long, String>
*/
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();
// 如果hashCode 大于 圆环上最大的位置,此时节点entry = null,需要将 TreeMap 的头结点赋值给 entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 返回的是String类型的serviceAddresses
return entry.getValue();
}

// 只要知道它是在做运算,返回的是字节数组。模仿dubbo里面的
static byte[] md5(String key) {
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
md.update(bytes);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
return md.digest();
}

// 只要知道它是在做运算。dubbo里面的
static long hash(byte[] digest, int idx) {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
}

举例:

假设从zk下取出的address集合中有两个服务地址

那么virtualInvokers这个集合就可以映射得到 160*2=320个虚拟节点,

所以此时圆环上有320个节点,其中160个指向地址1,160个指向地址2。

使用内部类ConsistentHashSelector的方法其就可以快速找到地址(String类型):

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!

请我喝杯咖啡吧~

支付宝
微信