wget https://dlcdn.apache.org/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgztar -xzvf flink-1.13.5-bin-scala_2.11.tgzcd flink-1.13.5/mkdir usrlibcp ./examples/streaming/TopSpeedWindowing.jar usrlib/./bin/standalone-job.sh start -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="10s" -j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing./bin/taskmanager.sh start
运行上面的脚本,便启动了一个 standalone 模式的 flink 集群并且提交了一个实时运行的作业,参数 -Dscheduler-mode=reactive 指定了 flink 作业以 reactive 模式运行。
Reactive 模式运行的 flink 作业有以下注意事项:
作业以 reactive 模式提交时,作业默认占有集群中的所有TaskManager节点,所以这种方式适合 per-job 而非yarn-session 的运行模式
通过动态(还是手动的方式)启动和停止TaskManager 节点来达到增加或减少 flink 作业运行所需要的资源
资源伸缩时(TM节点增加或者减少时),flink 作业自动重启
reactive 模式只支持 standalone 方式部署
二、flink master 节点进程中运行的相关组件(本篇涉及到的)
RestServer
Dispatcher
JobMaster
三、Reactive 模式作业提交与执行流程 执行启动 standalone 集群脚本创建一个 flink standalone 集群提交作业RestServer 接受客户端的作业提交请求,开始处理客户端生成的 JobGraphRestServer 通过 Rpc 方式与 Dispatcher 组件进行通信,将 JobGraph· 交给 Dispatcher 处理Dispatcher 将会创建一个 JobMaster 对象用来负责 JobGraph 的执行与调度JobMaster 会创建一个调度器(AdaptiveScheduler对象)用于作业生命周期的管理 四、AdaptiveScheduler 运行原理源码对应于 flink-runtime 模块中的 org.apache.flink.runtime.scheduler.adaptive 包
1、Reactive 模式调度器所涉及到的类:
AdaptiveScheduler
AdaptiveSchedulerFactory
Canceling
Created
CreatingExecutionGraph
Executing
Failing
Finished
Restarting
WaitingForResources
State
2、 AdaptiveScheduler 调度器本身是一个状态机,不同状态下的调度行为将会对作业产生不同的影响。
3、(Start -> Created)
当 flink standalone 模式启动JobMaster后,AdaptiveScheduler 初始化的时候处于 Created 状态,之后 AdaptiveScheduler 开始执行调度,进入到 WaitingForResources 状态;
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.java
public class AdaptiveScheduler implements SchedulerNG, Created.Context, WaitingForResources.Context, CreatingExecutionGraph.Context, Executing.Context, Restarting.Context, Failing.Context, Finished.Context, StopWithSavepoint.Context { private State state = new Created(this, LOG); // 状态初始化}
(Created -> WaitingForResources)
当有作业提交时, JobMaster 会调用 AdaptiveScheduler 的 startScheduling 方法来启动作业的调度执行;
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.java
@Override public void startScheduling() { state.as(Created.class) .orElseThrow( () -> new IllegalStateException( "Can only start scheduling when being in Created state.")) .startScheduling(); }
在 startScheduling 方法代码段中可以看到, 其实际调用的是相应状态(此时调度器的状态时 Created状态,也就是说使用的是 State 接口的具体实现 Created 类)的 startScheduling 方法,通过查看代码具体实现发现在 Created 类中由调用了 AdaptiveScheduler 的 goToWaitingForResources 方法;
org.apache.flink.runtime.scheduler.adaptive.Created.java
void startScheduling() { context.goToWaitingForResources(); }
这段代码中的 context 对象指的就是 AdaptiveScheduler 对象,意思是将 AdaptiveScheduler 的状态转向到等待资源(WaitingForResources)状态(参考上面的状态机转向图) ;
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.java
@Override public void goToWaitingForResources() { // ..、此处省略的是计算作业所需资源的代码片段 transitionToState( new WaitingForResources.Factory( this, LOG, desiredResources, this.initialResourceAllocationTimeout, this.resourceStabilizationTimeout)); }
在这段代码中有调用了 transitionToState 方法, transitionToState 方法主要功能是将调度器的状态转换到指定的状态,可以看到传入此方法的参数是一个 WaitingForResources.Factory 工厂类, WaitingForResources 状态正是通过该工厂创建的。
org.apache.flink.runtime.scheduler.adaptive.WaitingForResources.java
WaitingForResources( Context context, Logger log, ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, Clock clock) { ... context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO); }
在 WaitingForResources 对象的创建过程中,可以看到调用了 context.runIfState 方法, 这个方法有三个参数,第一个参数是当前的这个状态对象,在这里是指 WaitingForResources 状态,第二个参数是将要执行的一个方法,第三个参数是延迟多长时间再执行第二个参数的方法,在这里传的参数值代表零延迟, AdaptiveScheduler 的 runIfState 方法的功能是判断 AdaptiveScheduler 的状态是否为第一个参数所代表的状态,如果是,则执行第二个参数所代表的方法,这段代码可以看到是执行的 WaitingForResources 对象的 notifyNewResourcesAvailable 方法, 在 notifyNewResourcesAvailable 方法中调用了 checkDesiredOrSufficientResourcesAvailable 方法。
org.apache.flink.runtime.scheduler.adaptive.WaitingForResources.java
private void checkDesiredOrSufficientResourcesAvailable() { if (context.hasDesiredResources(desiredResources)) { createExecutionGraphWithAvailableResources(); return; } if (context.hasSufficientResources()) { if (resourceStabilizationDeadline == null) { resourceStabilizationDeadline = Deadline.fromNowWithClock(resourceStabilizationTimeout, clock); } if (resourceStabilizationDeadline.isOverdue()) { createExecutionGraphWithAvailableResources(); } else { // schedule next resource check context.runIfState( this, this::checkDesiredOrSufficientResourcesAvailable, resourceStabilizationDeadline.timeLeft()); } } else { // clear deadline due to insufficient resources resourceStabilizationDeadline = null; } }
在这段代码中,有两个主要的判断,第一个是 context.hasDesiredResources(desiredResources) 的判断,通过 AdaptiveScheduler 的 hasDesiredResources 方法判断当前集群中是否有运行当前作业所需要的资源,如果有,则进入下一个执行阶段。第二个是 context.hasSufficientResources() 的判断,通过 AdaptiveScheduler 的 hasSufficientResources 方法判断当前集群是否有充足的资源至少以最小并行度运行当前作业(每个共享组所分配到的 slot 数 = 当前集群拥有的 slot 数 / 当前作业的 slot 共享组数目),当每个共享组有可以分配到至少一个 slot 时,则说明当前集群有充足的资源。如果当前集群没有充足的资源时,则将资源等待计时器置空,使调度器一直处于等待资源状态直到有新的 TaskManger 加入;如果当前集群有充足的资源时,则判断资源等待是否超时,如果超时,则进入下一个执行阶段,如果没有超时,则使调度器处于等待资源状态并且继续检查集群资源是否满足当前作业的需求。等待资源状态的下一个执行阶段是调用 WaitingForResources 对象的 createExecutionGraphWithAvailableResources 方法来重新创建执行图。五、Reactive 模式上篇总结
AdaptiveScheduler 本身是一个有限状态机,作业启动、停止、重启、资源申请等操作都是在不同状态转换过程中委托给相应的状态实现来完成的。
插曲
下面是本人抖音号(2026775054),欢迎关注,可以将您想要了解的Flink源码部分私信发给我,后续给您录制视频讲解,谢谢支持~~~