本文主要梳理一下api网关调用后端dubbo接口流程。

1、API请求生命周期

img

2、后端服务(dubbo-proxy)

2.1、Netty实现http服务器

2.1.1、NettyServer

  • 参考:org.apache.dubbo.proxy.server.NettyServer📎NettyServer.java
  • 在本地启动服务:localIp:port
  • 重点在:HttpProcessHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@PostConstruct
public void start() {
serverStartor.execute(() -> {
init();
String inetHost = InetAddressUtil.getLocalIP();
try {
ChannelFuture f = bootstrap.bind(inetHost, port).sync();
logger.info("Dubbo proxy started, host is {} , port is {}.",
inetHost, port);
f.channel().closeFuture().sync();
logger.info("Dubbo proxy closed, host is {} , port is {}.",
inetHost, port);
} catch (InterruptedException e) {
logger.error("dubbo proxy start failed", e);
} finally {
destroy();
}
});

}

private void init() {
GenericInvoke.setRegistry(this.registry);
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new NamingThreadFactory("" +
"Dubbo-Proxy-Boss"));
workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,
new NamingThreadFactory("Dubbo-Proxy-Work"));
HttpProcessHandler processHandler = new HttpProcessHandler(businessThreadCount, serviceMapping, queryStringHandlerList);
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ProxyChannelInitializer(processHandler))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);

}

2.1.2、HttpProcessHandler

  • 📎HttpProcessHandler.java
  • 接受http请求,根据path解析必要的参数,如:application、service、methodName等信息
  • queryStringHandler.handleRequest(serviceDefinition, params)中根据stage和appId获取groupId和userId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(msg.uri());
String path = queryStringDecoder.rawPath();
if (path.endsWith("/")) {
path = path.substring(0, path.length() - 1);
}
if (path.startsWith("/")) {
path = path.substring(1);
}
if (path.contains("/")) {
String application = path.split("/")[0];
String service = path.split("/")[1];
String methodName = null;
String[] paramTypes = null;
Map<String, List<String>> params = queryStringDecoder.parameters();
logger.info("queryParams:{}", JSON.toJSONString(params));
if (params.containsKey("group")) {
service = params.get("group").get(0) + "/" + service;
}
if (params.containsKey("version")) {
service = service + ":" + params.get("version").get(0);
}
if (params.containsKey("methodName")) {
methodName = params.get("methodName").get(0);
}
if (params.containsKey("paramTypes")) {
@SuppressWarnings("UnstableApiUsage") List<String> paramTypesList = queryStringSplitter.splitToList(params.get("paramTypes").get(0));
if (!paramTypesList.isEmpty()) {
paramTypes = paramTypesList.toArray(new String[]{});
}
}
ByteBuf raw = msg.content();
String info = raw.toString(CharsetUtil.UTF_8);
ServiceDefinition serviceDefinition = JSON.parseObject(info, ServiceDefinition.class);
serviceDefinition.setServiceID(service);
serviceDefinition.setApplication(application);
if (methodName != null && serviceDefinition.getMethodName() == null) {
serviceDefinition.setMethodName(methodName);
}
if (paramTypes != null && serviceDefinition.getParamTypes() == null) {
serviceDefinition.setParamTypes(paramTypes);
}
for (QueryStringHandler queryStringHandler : queryStringHandlerList) {
queryStringHandler.handleRequest(serviceDefinition, params);
}
doRequest(ctx, serviceDefinition, msg);
} else if (HEALTHY_CHECK_URL.equals(path)) {
ResponseUtil.writeResponse(ctx, ResultGenerator.genSuccessResult(System.currentTimeMillis()), true);
} else {
throw new IllegalArgumentException("something was wrong with your request");
}
}

private void doRequest(ChannelHandlerContext ctx, ServiceDefinition serviceDefinition, HttpRequest msg) {
businessThreadPool.execute(new RequestWorker(serviceDefinition, ctx, msg, serviceMapping));
}

例子:微信客户列表

img

解析后:

  • application:f6-prado
  • service:f6-pre/com.f6car.prado.gateway.api.user.UnionOnlineUserApiGateway:${version}
  • methodName:list
  • queryParams:
1
{"group":["f6-pre"],"stage":["pre"],"paramTypes":["com.f6car.prado.gateway.api.user.dto.request.UnionOnlineUserSearchRequest"],"methodName":["list"],"appId":["110903880"],"traceId":["DBB8580E-E2A8-4C44-B224-805A35151AE8"],"proxy":["AliCloudApiGateway"]}

参数获取结束后,执行RequestWorker📎RequestWorker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void run() {
Preconditions.checkArgument(LegalNameUtil.isLegalName(serviceDefinition.getApplication()));
Preconditions.checkArgument(LegalNameUtil.isLegalName(serviceDefinition.getMethodName()));
String serviceID = serviceDefinition.getServiceID();
String interfaze = Tool.getInterface(serviceID);
String group = Tool.getGroup(serviceID);
String version = Tool.getVersion(serviceID);
Preconditions.checkArgument(LegalNameUtil.isLegalName(interfaze));
Preconditions.checkArgument(LegalNameUtil.isLegalName(group));
Preconditions.checkArgument(LegalNameUtil.isLegalName(version));


Object result;
try {
result = GenericInvoke.genericCall(interfaze, group, version, serviceDefinition);
ResponseUtil.writeResponse(ctx, ResultGenerator.genSuccessResult(result), HttpUtil.isKeepAlive(this.msg));
} catch (Exception e) {
//略
}

}

2.2、Dubbo泛化调用

  • 📎GenericInvoke.java

  • GenericInvoke.genericCall(interfaze, group, version, serviceDefinition)

  • 以微信客户列表为例:

    • interfaze=com.f6car.prado.gateway.api.user.UnionOnlineUserApiGateway
    • group=f6-pre
    • version=空字符串
  • 首先初始化com.alibaba.dubbo.config.ApplicationConfig,这里配置了zk为注册中心(使用AtomicBoolean.compareAndSet保证不会重复初始化)

  • 增加接口的dubbo服务引用注册,使用ConcurrentHashMap作为本地缓存

  • 设置Dubbo调用上下文(groupId+userId)

  • 使用com.alibaba.dubbo.rpc.service.GenericService#$invoke实现泛化调用(泛化调用的功能是通过扩展实现,有时间再总结)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static Object genericCall(String interfaceName, String group,
String version, ServiceDefinition serviceDefinition) {
if (init.compareAndSet(false, true)) {
init();
}
ReferenceConfig<GenericService> reference;
reference = addNewReference(interfaceName, group, version);
GenericService svc = reference.get();
if (serviceDefinition.getAttachments() != null && !serviceDefinition.getAttachments().isEmpty()) {
RpcContext.getContext().setAttachments(serviceDefinition.getAttachments());
}
logger.info("dubbo generic invoke, service is {}, method is {} , paramTypes is {} , paramObjs is {} , svc" +
" is {}.", interfaceName
, serviceDefinition.getMethodName(), serviceDefinition.getParamTypes(), serviceDefinition.getParamValues(), svc);
return svc.$invoke(serviceDefinition.getMethodName(), serviceDefinition.getParamTypes(), serviceDefinition.getParamValues());
}
private static void init() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(registry.getUrl().getProtocol() + "://" + registry.getUrl().getAddress());
registryConfig.setGroup(registry.getUrl().getParameter(Constants.GROUP_KEY));
applicationConfig = new ApplicationConfig();
applicationConfig.setName("dubbo-proxy");
applicationConfig.setRegistry(registryConfig);
}
private static ReferenceConfig<GenericService> addNewReference(String interfaceName,
String group, String version) {
ReferenceConfig<GenericService> reference;
String cachedKey = interfaceName + group + version;
reference = cachedConfig.get(cachedKey);
if (reference == null) {
ReferenceConfig<GenericService> newReference = initReference(interfaceName, group,
version);
ReferenceConfig<GenericService> oldReference = cachedConfig.putIfAbsent(cachedKey, newReference);
if (oldReference != null) {
reference = oldReference;
} else {
reference = newReference;
}
}
return reference;
}

private static ReferenceConfig<GenericService> initReference(String interfaceName, String group,
String version) {
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setGroup(group);
reference.setVersion(version);
reference.setInterface(interfaceName);
return reference;
}