欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

FlinkRPC源码流程

时间:2023-04-18

需要了解动态反射和RPC

文章目录

概述四个组件启动流程Rpc调用流程 概述

Akka系统的核心ActorSystem和Actor,若需构建一个Akka系统,首先需要创建ActorSystem,创建完ActorSystem后,可通过其创建Actor(注意:Akka不允许直接new一个Actor,只能通过 Akka 提供的某些 API 才能创建或查找 Actor,一般会通过 ActorSystem#actorOf和ActorContext#actorOf来创建 Actor),另外,我们只能通过ActorRef(Actor的引用, 其对原生的 Actor 实例做了良好的封装,外界不能随意修改其内部状态)来与Actor进行通信。
1、ActorSystem 是管理 Actor生命周期的组件, Actor是负责进行通信的组件
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种
方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处
理,不适合调用会阻塞的处理方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor。
5、如果一个 Actor 要和另外一个 Actor进行通信,则必须先获取对方 Actor 的ActorRef 对象,然后通过该对象发送消息即可。
6、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步返回处理结果。
7、当在任意地方发现要创建这四个组件的任何一个组件的实例对象的时候,创建成功了之后,都会要去执行他的 onStart() ,在集群启动的源码分析中,其实这些组件的很多的工作流程,都被放在 onStart() 里面。 先执行构造方法,后执行onStart方法。

四个组件

1、RpcGateway 网关(路由),各种其他RPC组件,都是 RpcGateWay 的子类
2、RpcServer RpcService 和 RpcEndpoint 之间的粘合层
3、RpcEndpoint 业务逻辑载体,对应的 Actor 的封装
4、RpcService 对应 ActorSystem 的封装

启动流程

在RpcEndpoint中还定义了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法来执行Rpc调用,值得注意的是在Flink的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint/进行Rpc调用时,其会委托RcpServer进行处理。进入rpcService.startServer(this),在RpcService中调用connect()方法与对端的RpcEndpoint(RpcServer)建立连接,connect()方法根据给的地址返回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是对方的代理) xxxRpcGateWay(例如connect(rpcEndpoint.getAddress())) ,连接后返回一个RpcGateway,即是他的实现类CompletableFuture。AkkaRpcService中封装了ActorSystem,并保存了ActorRef到RpcEndpoint的映射关系,在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServer,其会根据Endpoint类型(FencedRpcEndpoint或其他)来创建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并将RpcEndpoint和Actor对应的ActorRef保存起来,然后使用动态代理创建RpcServer,具体代码如下:

rpcService.startServer(this)

RPCEndpoint:protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");//在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServerthis.rpcServer = rpcService.startServer(this);this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}

public RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture terminationFuture = new CompletableFuture<>();final Props akkaRpcActorProps;// 根据RpcEndpoint类型创建不同类型的Propsif (rpcEndpoint instanceof FencedRpcEndpoint) { akkaRpcActorProps = Props.create( FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumframesize()); } else { akkaRpcActorProps = Props.create( AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumframesize()); } ActorRef actorRef;// 同步块,创建Actor,并获取对应的ActorRefsynchronized (lock) { checkState(!stopped, "RpcService is stopped"); actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); actors.put(actorRef, rpcEndpoint); } LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());// 获取Actor的路径final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);final String hostname; Option host = actorRef.path().address().host();if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); }// 解析该RpcEndpoint实现的所有RpcGateway接口 Set> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); // 额外添加RpcServer和AkkabasedEnpoint类 implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkabasedEndpoint.class);final InvocationHandler akkaInvocationHandler;// 根据不同类型动态创建代理对象if (rpcEndpoint instanceof FencedRpcEndpoint) {// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumframesize(), terminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumframesize(), terminationFuture); }// Rather than using the System ClassLoader directly, we derive the ClassLoader// from this class 、That works better in cases where Flink runs embedded and all Flink// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader();// 生成RpcServer对象,而后对该server的调用都会进入Handler的invoke方法处理,handler实现了多个接口的方法@SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler);return server; }

调用RpcEndpoint#start;

委托给RpcServer#start;

调用动态代理的AkkaInvocationHandler#invoke;发现调用的是StartStoppable#start方法,则直接进行本地方法调用;invoke方法的代码如下:

RPCEndpoint:public final void start() {rpcServer.start();}AkkaInvocationHandler:public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result;// 先匹配指定类型(handler已实现接口的方法),若匹配成功则直接进行本地方法调用;若匹配为FencedRpcGateway类型,则抛出异常(应该在FencedAkkaInvocationHandler中处理);其他则进行Rpc调用if (declaringClass.equals(AkkabasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) {throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + "、This indicates that you retrieved a FencedRpcGateway without specifying a " +"fencing token、Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +"retrieve a properly FencedRpcGateway."); } else { result = invokeRpc(method, args); }return result; }

调用AkkaInvocationHandler#start;

通过ActorRef#tell给对应的Actor发送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());

调用AkkaRpcActor#handleControlMessage处理控制类型消息;

在主线程中将自身状态变更为Started状态;

经过上述步骤就完成了Actor的启动过程,Actor启动后便可与Acto通信让其执行代码(如runSync/callSync等)和处理Rpc请求了。下面分别介绍处理执行代码和处理Rpc请求;

Rpc调用流程

AkkaInvocationHandler#invokeRpc,其方法如下:private Object invokeRpc(Method method, Object[] args) throws Exception {// 获取方法相应的信息String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);// 创建RpcInvocationMessage(可分为LocalRpcInvocation/RemoteRpcInvocation) final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args); Class<?> returnType = method.getReturnType(); final Object result;// 无返回,则使用tell方法if (Objects.equals(returnType, Void.TYPE)) { tell(rpcInvocation); result = null; } else {// execute an asynchronous call// 有返回,则使用ask方法 CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout); CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {// 调用返回后进行反序列化if (o instanceof SerializedValue) {try {return ((SerializedValue<?>) o).deserializevalue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) {throw new CompletionException(new RpcException("Could not deserialize the serialized payload of RPC method : " + methodName, e)); } } else {// 直接返回return o; } });// 若返回类型为CompletableFuture则直接赋值if (Objects.equals(returnType, CompletableFuture.class)) { result = completableFuture; } else {try {// 从CompletableFuture获取 result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit()); } catch (ExecutionException ee) {throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee)); } } }return result; }

然后转到服务器接收

AkkaRpcActor#handleRpcInvocation,其代码如下:

private void handleRpcInvocation(RpcInvocation rpcInvocation) { Method rpcMethod = null;try {// 获取方法的信息 String methodName = rpcInvocation.getMethodName(); Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();// 在RpcEndpoint中找指定方法 rpcMethod = lookupRpcMethod(methodName, parameterTypes); } catch (ClassNotFoundException e) { log.error("Could not load method arguments.", e);// 异常处理 RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } catch (IOException e) { log.error("Could not deserialize rpc invocation message.", e);// 异常处理 RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } catch (final NoSuchMethodException e) { log.error("Could not find rpc method for rpc invocation.", e);// 异常处理 RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); }if (rpcMethod != null) {try {// this supports declaration of anonymous classes rpcMethod.setAccessible(true);// 返回类型为空则直接进行invokeif (rpcMethod.getReturnType().equals(Void.TYPE)) {// No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); }else {final Object result;try { result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); }catch (InvocationTargetException e) { log.debug("Reporting back error thrown in remote procedure {}", rpcMethod, e);// tell the sender about the failure getSender().tell(new Status.Failure(e.getTargetException()), getSelf());return; }final String methodName = rpcMethod.getName();// 方法返回类型为CompletableFutureif (result instanceof CompletableFuture) {final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;// 发送结果(使用Patterns发送结果给调用者,并会进行序列化并验证结果大小) sendAsyncResponse(responseFuture, methodName); } else {// 类型非CompletableFuture,发送结果(使用Patterns发送结果给调用者,并会进行序列化并验证结果大小) sendSyncResponse(result, methodName); } } } catch (Throwable e) { log.error("Error while executing remote procedure call {}.", rpcMethod, e);// tell the sender about the failure getSender().tell(new Status.Failure(e), getSelf()); } } }

将结果返回给调用者AkkaInvocationHandler#ask;

补充:

AkkaRpcActor,会根据类型的不同,进行不同的处理

protected void handleRpcMessage(Object message) {// 根据消息类型不同进行不同的处理if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); } else if (message instanceof CallAsync) { handleCallAsync((CallAsync) message); } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); } else { log.warn("Received message of unknown type {} with value {}、Dropping this message!", message.getClass().getName(), message); sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +" of type " + message.getClass().getSimpleName() + '.')); } }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。