rpc框架之SPI机制源码分析

基于dubbo实现rpc框架,分析SPI机制,并实现一个

参考:http://tianxiaobo.com/2018/10/01/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-SPI-%E6%9C%BA%E5%88%B6/

javaguide-rpc

简介:

SPI 全称为 Service Provider Interface,是一种服务发现机制。SPI的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口加载实现类。 正因此特性,我们可以很容易的通过SPI 机制为我们的程序提供拓展功能。SPI 机制在第三方框架中也有所应用,比如 Dubbo 就是通过 SPI机制加载所有的组件。不过,Dubbo 并未使用 Java 原生的 SPI 机制,而是对其进行了增强,使其能够更好的满足需求。

本框架SPI示例:

并未使用 Java SPI,而是重新实现了一套SPI 机制。SPI 的相关逻辑被封装在了
ExtensionLoader 类中,通过 ExtensionLoader,我们可以加载指定的实现类。我们将接口的实现类的全限定名配置在文件中, 放置在META-INF/extensions路径下,下面来看一下配置内容。

  • 还需要在接口上使用@SPI注解。

简单使用:

那么在RpcMessageDecoder.java中就可以直接创建一个Compress类了,使用对象中的方法。compressName=gzip

源码分析:

我们首先通过 ExtensionLoader 的 getExtensionLoader 方法获取一个 ExtensionLoader
实例,然后再通过 ExtensionLoader 的 getExtension 方法获取拓展类对象。这其中,getExtensionLoader用于从缓存中获取与拓展类对应的 ExtensionLoader,若缓存未命中,则创建一个新的实例。

那我们稍微了解下SPI机制,实现下大致功能。

Holder.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.aaron.extension;

// Holder仅用于持有对象目标,没有其他逻辑
public class Holder<T> {
/**
* volatile修饰一个对象的话其实是保证这个对象的引用的可见性,而不是保证这个对象的内容的可见性。
*/
private volatile T value;

public T get() {
return value;
}

public void set(T value) {
this.value = value;
}
}

ExtensionLoader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package com.aaron.extension;


import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* 参考的是dubbo spi机制。
*/
@Slf4j
public final class ExtensionLoader<T> {
// 配置文件path
private static final String SERVICE_DIRECTORY = "META-INF/extensions/";
// 保存的是ExtensionLoader实例,一个拓展类就有一个ExtensionLoader实例吗?
private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
// Extension对象,就是拓展类,key:接口类型,value:实例化对象
private static final Map<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();
// 某个接口类
private final Class<?> type;
// 实例化的拓展对象,key:String, value:Holder<Object>对象
private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
// 缓存中保存了实例化的拓展类
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

// 构造方法
private ExtensionLoader(Class<?> type) {
this.type = type;
}

/**
* 获取ExtensionLoader实例
* 首先从缓存中获取与拓展类对应的 ExtensionLoader,若缓存未命中,则创建一个新的实例。
*/
public static <S> ExtensionLoader<S> getExtensionLoader(Class<S> type) {
// 查看这个类要获取实例时(也就是拓展类),符不符合规则。
if (type == null) {
throw new IllegalArgumentException("Extension type should not be null.");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type must be an interface.");
}
if (type.getAnnotation(SPI.class) == null) {
throw new IllegalArgumentException("Extension type must be annotated by @SPI");
}
// 首先在cache中查找,与之对应的实例化对象
ExtensionLoader<S> extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type);
// 如果cache没有命中,那么就创建一个新的实例。创建完成之后,再在EXTENSION_LOADERS中获取。
// putIfAbsent()方法:如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
if (extensionLoader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<S>(type));
extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type);
}
return extensionLoader;
}

/**
* 通过ExtensionLoader实例 的getExtension方法获取拓展类对象.
* 首先检查缓存,缓存未命中则创建拓展对象。
*/
public T getExtension(String name) {
// 传入的参数就是:全限定的接口名
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Extension name should not be null or empty.");
}
// Holder仅用于持有目标对象,没其他什么逻辑
Holder<Object> holder = cachedInstances.get(name);

if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
// 没有拓展实例不存在的话,就创建一个实例
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建拓展实例
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}

// 创建拓展对象实例
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,形成配置项名称到配置类的映射关系。得到的是类型
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw new RuntimeException("No such extension of name " + name);
}
// 在缓存中判断是否有这个拓展类对象实例
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
try {
// 创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
// 创建完再获取
instance = (T) EXTENSION_INSTANCES.get(clazz);
} catch (Exception e) {
log.error(e.getMessage());
}
}
return instance;
}

/**
* 我们在通过名称获取拓展类之前,
* 首先需要根据配置文件解析出名称到拓展类的映射,也就是 Map<名称, 拓展类>。
* 之后再从 Map 中取出相应的拓展类即可
*/
// 这里也是先检查缓存,若缓存未命中,则通过 synchronized 加锁。
// 加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则加载拓展类。以上代码的写法是典型的双重检查锁.
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中获取 拓展类(class)
Map<String, Class<?>> classes = cachedClasses.get();
// double check
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = new HashMap<>();
// load all extensions from our extensions directory
// 从指定文件夹配置文件中 加载所有的拓展类
loadDirectory(classes);
cachedClasses.set(classes);
}
}
}
return classes;
}

// 从指定文件夹配置文件中 加载所有的拓展类
private void loadDirectory(Map<String, Class<?>> extensionClasses) {
// fileName = 文件夹路径 + type 全限定名。所以文件名必须以type的全限定名命名
String fileName = ExtensionLoader.SERVICE_DIRECTORY + type.getName();
try {
Enumeration<URL> urls;
ClassLoader classLoader = ExtensionLoader.class.getClassLoader();
// 根据文件名加载所有的同名文件
urls = classLoader.getResources(fileName);
//
if (urls != null) {
while (urls.hasMoreElements()) {
URL resourceUrl = urls.nextElement();
// 加载资源
loadResource(extensionClasses, classLoader, resourceUrl);
}
}
} catch (IOException e) {
log.error(e.getMessage());
}
}

// 加载资源,读取所有的配置文件信息,并全部加载到extensionClasses中
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, URL resourceUrl) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), UTF_8))) {

String line;
// read every line,按行读取配置内容
while ((line = reader.readLine()) != null) {
// get index of comment
final int ci = line.indexOf('#');
if (ci >= 0) {
// 截取#之前的字符串,#之后的内容为注释,忽略
line = line.substring(0, ci);
}
// 去除首位空格
line = line.trim();
if (line.length() > 0) {
try {
final int ei = line.indexOf('=');
// 以 = 为界,截取键与值。比如 dubbo=com.alibaba.compress.DubboProtocol
String name = line.substring(0, ei).trim();
String clazzName = line.substring(ei + 1).trim();
// 我们的SPI使用键值对,因此它们两个都不能是空的
if (name.length() > 0 && clazzName.length() > 0) {
Class<?> clazz = classLoader.loadClass(clazzName);
extensionClasses.put(name, clazz);
}
} catch (ClassNotFoundException e) {
log.error(e.getMessage());
}
}
}

} catch (IOException e) {
log.error(e.getMessage());
}
}
}

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!

请我喝杯咖啡吧~

支付宝
微信