本文主要梳理一下api网关调用后端dubbo接口流程。
1、API请求生命周期
2、后端服务(dubbo-proxy)
2.1、Netty实现http服务器
2.1.1、NettyServer
- 参考:org.apache.dubbo.proxy.server.NettyServer📎NettyServer.java
- 在本地启动服务:localIp:port
- 重点在:HttpProcessHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @PostConstruct public void start() { serverStartor.execute(() -> { init(); String inetHost = InetAddressUtil.getLocalIP(); try { ChannelFuture f = bootstrap.bind(inetHost, port).sync(); logger.info("Dubbo proxy started, host is {} , port is {}.", inetHost, port); f.channel().closeFuture().sync(); logger.info("Dubbo proxy closed, host is {} , port is {}.", inetHost, port); } catch (InterruptedException e) { logger.error("dubbo proxy start failed", e); } finally { destroy(); } });
}
private void init() { GenericInvoke.setRegistry(this.registry); bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new NamingThreadFactory("" + "Dubbo-Proxy-Boss")); workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new NamingThreadFactory("Dubbo-Proxy-Work")); HttpProcessHandler processHandler = new HttpProcessHandler(businessThreadCount, serviceMapping, queryStringHandlerList); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ProxyChannelInitializer(processHandler)) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true);
}
|
2.1.2、HttpProcessHandler
- 📎HttpProcessHandler.java
- 接受http请求,根据path解析必要的参数,如:application、service、methodName等信息
- queryStringHandler.handleRequest(serviceDefinition, params)中根据stage和appId获取groupId和userId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(msg.uri()); String path = queryStringDecoder.rawPath(); if (path.endsWith("/")) { path = path.substring(0, path.length() - 1); } if (path.startsWith("/")) { path = path.substring(1); } if (path.contains("/")) { String application = path.split("/")[0]; String service = path.split("/")[1]; String methodName = null; String[] paramTypes = null; Map<String, List<String>> params = queryStringDecoder.parameters(); logger.info("queryParams:{}", JSON.toJSONString(params)); if (params.containsKey("group")) { service = params.get("group").get(0) + "/" + service; } if (params.containsKey("version")) { service = service + ":" + params.get("version").get(0); } if (params.containsKey("methodName")) { methodName = params.get("methodName").get(0); } if (params.containsKey("paramTypes")) { @SuppressWarnings("UnstableApiUsage") List<String> paramTypesList = queryStringSplitter.splitToList(params.get("paramTypes").get(0)); if (!paramTypesList.isEmpty()) { paramTypes = paramTypesList.toArray(new String[]{}); } } ByteBuf raw = msg.content(); String info = raw.toString(CharsetUtil.UTF_8); ServiceDefinition serviceDefinition = JSON.parseObject(info, ServiceDefinition.class); serviceDefinition.setServiceID(service); serviceDefinition.setApplication(application); if (methodName != null && serviceDefinition.getMethodName() == null) { serviceDefinition.setMethodName(methodName); } if (paramTypes != null && serviceDefinition.getParamTypes() == null) { serviceDefinition.setParamTypes(paramTypes); } for (QueryStringHandler queryStringHandler : queryStringHandlerList) { queryStringHandler.handleRequest(serviceDefinition, params); } doRequest(ctx, serviceDefinition, msg); } else if (HEALTHY_CHECK_URL.equals(path)) { ResponseUtil.writeResponse(ctx, ResultGenerator.genSuccessResult(System.currentTimeMillis()), true); } else { throw new IllegalArgumentException("something was wrong with your request"); } }
private void doRequest(ChannelHandlerContext ctx, ServiceDefinition serviceDefinition, HttpRequest msg) { businessThreadPool.execute(new RequestWorker(serviceDefinition, ctx, msg, serviceMapping)); }
|
例子:微信客户列表
解析后:
- application:f6-prado
- service:f6-pre/com.f6car.prado.gateway.api.user.UnionOnlineUserApiGateway:${version}
- methodName:list
- queryParams:
1
| {"group":["f6-pre"],"stage":["pre"],"paramTypes":["com.f6car.prado.gateway.api.user.dto.request.UnionOnlineUserSearchRequest"],"methodName":["list"],"appId":["110903880"],"traceId":["DBB8580E-E2A8-4C44-B224-805A35151AE8"],"proxy":["AliCloudApiGateway"]}
|
参数获取结束后,执行RequestWorker📎RequestWorker.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public void run() { Preconditions.checkArgument(LegalNameUtil.isLegalName(serviceDefinition.getApplication())); Preconditions.checkArgument(LegalNameUtil.isLegalName(serviceDefinition.getMethodName())); String serviceID = serviceDefinition.getServiceID(); String interfaze = Tool.getInterface(serviceID); String group = Tool.getGroup(serviceID); String version = Tool.getVersion(serviceID); Preconditions.checkArgument(LegalNameUtil.isLegalName(interfaze)); Preconditions.checkArgument(LegalNameUtil.isLegalName(group)); Preconditions.checkArgument(LegalNameUtil.isLegalName(version));
Object result; try { result = GenericInvoke.genericCall(interfaze, group, version, serviceDefinition); ResponseUtil.writeResponse(ctx, ResultGenerator.genSuccessResult(result), HttpUtil.isKeepAlive(this.msg)); } catch (Exception e) { }
}
|
2.2、Dubbo泛化调用
📎GenericInvoke.java
GenericInvoke.genericCall(interfaze, group, version, serviceDefinition)
以微信客户列表为例:
- interfaze=com.f6car.prado.gateway.api.user.UnionOnlineUserApiGateway
- group=f6-pre
- version=空字符串
首先初始化com.alibaba.dubbo.config.ApplicationConfig,这里配置了zk为注册中心(使用AtomicBoolean.compareAndSet保证不会重复初始化)
增加接口的dubbo服务引用注册,使用ConcurrentHashMap作为本地缓存
设置Dubbo调用上下文(groupId+userId)
使用com.alibaba.dubbo.rpc.service.GenericService#$invoke实现泛化调用(泛化调用的功能是通过扩展实现,有时间再总结)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public static Object genericCall(String interfaceName, String group, String version, ServiceDefinition serviceDefinition) { if (init.compareAndSet(false, true)) { init(); } ReferenceConfig<GenericService> reference; reference = addNewReference(interfaceName, group, version); GenericService svc = reference.get(); if (serviceDefinition.getAttachments() != null && !serviceDefinition.getAttachments().isEmpty()) { RpcContext.getContext().setAttachments(serviceDefinition.getAttachments()); } logger.info("dubbo generic invoke, service is {}, method is {} , paramTypes is {} , paramObjs is {} , svc" + " is {}.", interfaceName , serviceDefinition.getMethodName(), serviceDefinition.getParamTypes(), serviceDefinition.getParamValues(), svc); return svc.$invoke(serviceDefinition.getMethodName(), serviceDefinition.getParamTypes(), serviceDefinition.getParamValues()); } private static void init() { RegistryConfig registryConfig = new RegistryConfig(); registryConfig.setAddress(registry.getUrl().getProtocol() + "://" + registry.getUrl().getAddress()); registryConfig.setGroup(registry.getUrl().getParameter(Constants.GROUP_KEY)); applicationConfig = new ApplicationConfig(); applicationConfig.setName("dubbo-proxy"); applicationConfig.setRegistry(registryConfig); } private static ReferenceConfig<GenericService> addNewReference(String interfaceName, String group, String version) { ReferenceConfig<GenericService> reference; String cachedKey = interfaceName + group + version; reference = cachedConfig.get(cachedKey); if (reference == null) { ReferenceConfig<GenericService> newReference = initReference(interfaceName, group, version); ReferenceConfig<GenericService> oldReference = cachedConfig.putIfAbsent(cachedKey, newReference); if (oldReference != null) { reference = oldReference; } else { reference = newReference; } } return reference; }
private static ReferenceConfig<GenericService> initReference(String interfaceName, String group, String version) { ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); reference.setGeneric(true); reference.setApplication(applicationConfig); reference.setGroup(group); reference.setVersion(version); reference.setInterface(interfaceName); return reference; }
|