前言
截至2023-07-09,DolphinScheduler3.x最新版本Dev分支,DolphinScheduler中虽然基于Netty实现了一个简单的RPC框架,但是并没有使用,或者说使用的不是完整版的RPC框架。其中大量直接使用Netty Client发送网络请求,并没有使用动态代理简化或或者说屏蔽掉通信细节,虽然在org.apache.dolphinscheduler.rpc
包中已经有了完整实现。
本文主要分析org.apache.dolphinscheduler.rpc
包中完整的RPC实现。虽然在DolphinScheduler中没有被使用,但是代码是共通的。
源码分析
Rpc通信协议Protocol
定义在org.apache.dolphinscheduler.rpc.protocol.MessageHeader类中,没有什么好说的,差不多的套路。
- 一字节的version
- 一字节的eventType:HEARTBEAT、REQUEST、RESPONSE
- 四字节的msgLength
- ……
- 一字节的serialization类型:dolphinscheduler目前实现了一种基于ProtoStuff。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class MessageHeader { private byte version = 1;
private byte eventType;
private int msgLength = 0;
private long requestId = 0L;
private byte serialization = 0;
private short magic = RpcProtocolConstants.MAGIC; }
|
基于Netty进行网络通信
编解码,心跳机制属于模板代码,不做介绍。核心业务逻辑集中在Netty的Handler中:org.apache.dolphinscheduler.rpc.remote.NettyClientHandler
和org.apache.dolphinscheduler.rpc.remote.NettyServerHandler
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
public RpcResponse sendMsg(Host host, RpcProtocol<RpcRequest> protocol, Boolean async) {
Channel channel = getChannel(host); assert channel != null;
RpcRequest request = protocol.getBody(); RpcRequestCache rpcRequestCache = new RpcRequestCache(); String serviceName = request.getClassName() + request.getMethodName(); rpcRequestCache.setServiceName(serviceName); long reqId = protocol.getMsgHeader().getRequestId(); RpcFuture future = null; if (Boolean.FALSE.equals(async)) { future = new RpcFuture(request, reqId); rpcRequestCache.setRpcFuture(future); } RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache); channel.writeAndFlush(protocol); RpcResponse result = null; if (Boolean.TRUE.equals(async)) { result = new RpcResponse(); result.setStatus((byte) 0); result.setResult(true); return result; } try { assert future != null; result = future.get(); } catch (InterruptedException e) { log.error("send msg error,service name is {}", serviceName, e); Thread.currentThread().interrupt(); } return result; }
|
动态代理
DolphinScheduler使用ByteBuddy框架进行客户端的动态代理,进行实际的网络请求,屏蔽相关细节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class RpcClient implements IRpcClient {
@Override public <T> T create(Class<T> clazz, Host host) throws Exception { return new ByteBuddy() .subclass(clazz) .method(isDeclaredBy(clazz)) .intercept(MethodDelegation.to(new ConsumerInterceptor(host))) .make() .load(getClass().getClassLoader()) .getLoaded() .getDeclaredConstructor().newInstance(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
@RuntimeType public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException { RpcRequest request = buildReq(args, method);
String serviceName = method.getDeclaringClass().getSimpleName() + method.getName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); if (null == consumerConfig) { consumerConfig = cacheServiceConfig(method, serviceName); } boolean async = consumerConfig.getAsync();
int retries = consumerConfig.getRetries();
RpcProtocol<RpcRequest> protocol = buildProtocol(request);
while (retries-- > 0) { RpcResponse rsp; rsp = nettyClient.sendMsg(host, protocol, async); if (null != rsp && rsp.getStatus() == 0) { return rsp.getResult(); } } throw new RemotingException("send msg error");
}
|
服务发现
DolphinScheduler定义了两个注解@RpcService("IUserService")
和@Rpc(async = true, serviceCallback = UserCallback.class)
,简化Rpc的配置和服务的发现。
Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class RpcTest {
private NettyServer nettyServer;
private IUserService userService;
private Host host;
@BeforeEach public void before() throws Exception { nettyServer = new NettyServer(new NettyServerConfig()); IRpcClient rpcClient = new RpcClient(); host = new Host("127.0.0.1", 12346); userService = rpcClient.create(IUserService.class, host); }
@Test public void callTest(){ Boolean hello = userService.say("hello"); System.out.printf("Rpc Call Result %s\n", hello);
System.out.println("###############"); System.out.println(userService.callBackIsFalse("hello")); }
@AfterEach public void after() { NettyClient.getInstance().close(); nettyServer.close(); }
}
|